diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/http/config.go | 5 | ||||
-rw-r--r-- | plugins/http/constants.go | 2 | ||||
-rw-r--r-- | plugins/http/handler.go | 29 | ||||
-rw-r--r-- | plugins/http/plugin.go | 12 | ||||
-rw-r--r-- | plugins/kv/boltdb/config.go | 24 | ||||
-rw-r--r-- | plugins/kv/boltdb/plugin.go | 452 | ||||
-rw-r--r-- | plugins/kv/boltdb/plugin_unit_test.go | 531 | ||||
-rw-r--r-- | plugins/kv/interface.go | 41 | ||||
-rw-r--r-- | plugins/kv/memcached/config.go | 10 | ||||
-rw-r--r-- | plugins/kv/memcached/plugin.go | 252 | ||||
-rw-r--r-- | plugins/kv/memcached/storage_test.go | 432 | ||||
-rw-r--r-- | plugins/kv/memory/config.go | 15 | ||||
-rw-r--r-- | plugins/kv/memory/plugin.go | 262 | ||||
-rw-r--r-- | plugins/kv/memory/storage_test.go | 473 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 110 | ||||
-rw-r--r-- | plugins/reload/plugin.go | 2 | ||||
-rw-r--r-- | plugins/reload/watcher.go | 4 | ||||
-rw-r--r-- | plugins/server/config.go | 1 | ||||
-rw-r--r-- | plugins/server/interface.go | 1 | ||||
-rw-r--r-- | plugins/server/plugin.go | 3 |
20 files changed, 2637 insertions, 24 deletions
diff --git a/plugins/http/config.go b/plugins/http/config.go index 00d2940b..3b670c86 100644 --- a/plugins/http/config.go +++ b/plugins/http/config.go @@ -11,8 +11,10 @@ import ( poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" ) +// Cidrs is a slice of IPNet addresses type Cidrs []*net.IPNet +// IsTrusted checks if the ip address exists in the provided in the config addresses func (c *Cidrs) IsTrusted(ip string) bool { if len(*c) == 0 { return false @@ -137,7 +139,7 @@ func (c *Config) EnableFCGI() bool { return c.FCGI.Address != "" } -// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +// InitDefaults must populate Config values using given Config source. Must return error if Config is not valid. func (c *Config) InitDefaults() error { if c.Pool == nil { // default pool @@ -202,6 +204,7 @@ func (c *Config) InitDefaults() error { return c.Valid() } +// ParseCIDRs parse IPNet addresses and return slice of its func ParseCIDRs(subnets []string) (Cidrs, error) { c := make(Cidrs, 0, len(subnets)) for _, cidr := range subnets { diff --git a/plugins/http/constants.go b/plugins/http/constants.go index 773d1f46..c3d5c589 100644 --- a/plugins/http/constants.go +++ b/plugins/http/constants.go @@ -3,4 +3,6 @@ package http import "net/http" var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push") + +// TrailerHeaderKey http header key var TrailerHeaderKey = http.CanonicalHeaderKey("trailer") diff --git a/plugins/http/handler.go b/plugins/http/handler.go index 15954f96..9c40cdfc 100644 --- a/plugins/http/handler.go +++ b/plugins/http/handler.go @@ -23,13 +23,9 @@ const ( EventError ) +// MB is 1024 bytes const MB = 1024 * 1024 -type Handle interface { - AddListener(l events.Listener) - ServeHTTP(w http.ResponseWriter, r *http.Request) -} - // ErrorEvent represents singular http error event. type ErrorEvent struct { // Request contains client request, must not be stored. @@ -68,7 +64,7 @@ func (e *ResponseEvent) Elapsed() time.Duration { // Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, // parsed files and query, payload will include parsed form dataTree (if any). -type handler struct { +type Handler struct { maxRequestSize uint64 uploads UploadsConfig trusted Cidrs @@ -78,11 +74,12 @@ type handler struct { lsn events.Listener } -func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) { +// NewHandler return handle interface implementation +func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (*Handler, error) { if pool == nil { return nil, errors.E(errors.Str("pool should be initialized")) } - return &handler{ + return &Handler{ maxRequestSize: maxReqSize * MB, uploads: uploads, pool: pool, @@ -90,8 +87,8 @@ func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool po }, nil } -// Listen attaches handler event controller. -func (h *handler) AddListener(l events.Listener) { +// AddListener attaches handler event controller. +func (h *Handler) AddListener(l events.Listener) { h.mul.Lock() defer h.mul.Unlock() @@ -99,7 +96,7 @@ func (h *handler) AddListener(l events.Listener) { } // mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. -func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { const op = errors.Op("ServeHTTP") start := time.Now() @@ -148,7 +145,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (h *handler) maxSize(w http.ResponseWriter, r *http.Request, start time.Time, op errors.Op) error { +func (h *Handler) maxSize(w http.ResponseWriter, r *http.Request, start time.Time, op errors.Op) error { if length := r.Header.Get("content-length"); length != "" { if size, err := strconv.ParseInt(length, 10, 64); err != nil { h.handleError(w, r, err, start) @@ -162,7 +159,7 @@ func (h *handler) maxSize(w http.ResponseWriter, r *http.Request, start time.Tim } // handleError sends error. -func (h *handler) handleError(w http.ResponseWriter, r *http.Request, err error, start time.Time) { +func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error, start time.Time) { h.mul.Lock() defer h.mul.Unlock() // if pipe is broken, there is no sense to write the header @@ -186,19 +183,19 @@ func (h *handler) handleError(w http.ResponseWriter, r *http.Request, err error, } // handleResponse triggers response event. -func (h *handler) handleResponse(req *Request, resp *Response, start time.Time) { +func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) { h.throw(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)}) } // throw invokes event handler if any. -func (h *handler) throw(event interface{}) { +func (h *Handler) throw(event interface{}) { if h.lsn != nil { h.lsn(event) } } // get real ip passing multiple proxy -func (h *handler) resolveIP(r *Request) { +func (h *Handler) resolveIP(r *Request) { if h.trusted.IsTrusted(r.RemoteAddr) == false { return } diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 2651f305..e6aba78b 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -44,7 +44,7 @@ type Middleware interface { type middleware map[string]Middleware -// Service manages pool, http servers. +// Plugin manages pool, http servers. The main http plugin structure type Plugin struct { sync.RWMutex @@ -60,7 +60,7 @@ type Plugin struct { pool pool.Pool // servers RR handler - handler Handle + handler *Handler // servers http *http.Server @@ -267,15 +267,17 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.RUnlock() } -// Server returns associated pool workers +// Workers returns associated pool workers func (s *Plugin) Workers() []worker.BaseProcess { return s.pool.Workers() } +// Name returns endure.Named interface implementation func (s *Plugin) Name() string { return PluginName } +// Reset destroys the old pool and replaces it with new one, waiting for old pool to die func (s *Plugin) Reset() error { s.Lock() defer s.Unlock() @@ -319,12 +321,14 @@ func (s *Plugin) Reset() error { return nil } +// Collects collecting http middlewares func (s *Plugin) Collects() []interface{} { return []interface{}{ s.AddMiddleware, } } +// AddMiddleware is base requirement for the middleware (name and Middleware) func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) { s.mdwr[name.Name()] = m } @@ -414,7 +418,7 @@ func (s *Plugin) initSSL() *http.Server { hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X if hasGCMAsm { - // If AES-GCM hardware is provided then prioritise AES-GCM + // If AES-GCM hardware is provided then priorities AES-GCM // cipher suites. topCipherSuites = []uint16{ tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, diff --git a/plugins/kv/boltdb/config.go b/plugins/kv/boltdb/config.go new file mode 100644 index 00000000..b2e1e636 --- /dev/null +++ b/plugins/kv/boltdb/config.go @@ -0,0 +1,24 @@ +package boltdb + +type Config struct { + // Dir is a directory to store the DB files + Dir string + // File is boltDB file. No need to create it by your own, + // boltdb driver is able to create the file, or read existing + File string + // Bucket to store data in boltDB + Bucket string + // db file permissions + Permissions int + // timeout + Interval uint `yaml:"interval"` +} + +// InitDefaults initializes default values for the boltdb +func (s *Config) InitDefaults() { + s.Dir = "." // current dir + s.Bucket = "rr" // default bucket name + s.File = "rr.db" // default file name + s.Permissions = 0777 // free for all + s.Interval = 60 // default is 60 seconds timeout +} diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/boltdb/plugin.go new file mode 100644 index 00000000..6cfc49f6 --- /dev/null +++ b/plugins/kv/boltdb/plugin.go @@ -0,0 +1,452 @@ +package boltdb + +import ( + "bytes" + "encoding/gob" + "os" + "path" + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" + bolt "go.etcd.io/bbolt" +) + +const PluginName = "boltdb" + +// BoltDB K/V storage. +type Plugin struct { + // db instance + DB *bolt.DB + // name should be UTF-8 + bucket []byte + + // config for RR integration + cfg *Config + + // logger + log logger.Logger + + // gc contains key which are contain timeouts + gc *sync.Map + // default timeout for cache cleanup is 1 minute + timeout time.Duration + + // stop is used to stop keys GC and close boltdb connection + stop chan struct{} +} + +func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + const op = errors.Op("boltdb plugin init") + s.cfg = &Config{} + + s.cfg.InitDefaults() + + err := cfg.UnmarshalKey(PluginName, &s.cfg) + if err != nil { + return errors.E(op, errors.Disabled, err) + } + + // set the logger + s.log = log + + db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil) + if err != nil { + return errors.E(op, err) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb Update") + _, err = tx.CreateBucketIfNotExists([]byte(s.cfg.Bucket)) + if err != nil { + return errors.E(op, upOp) + } + return nil + }) + + if err != nil { + return errors.E(op, err) + } + + s.DB = db + s.bucket = []byte(s.cfg.Bucket) + s.stop = make(chan struct{}) + s.timeout = time.Duration(s.cfg.Interval) * time.Second + s.gc = &sync.Map{} + + return nil +} + +func (s *Plugin) Serve() chan error { + errCh := make(chan error, 1) + // start the TTL gc + go s.gcPhase() + + return errCh +} + +func (s *Plugin) Stop() error { + const op = errors.Op("boltdb stop") + err := s.Close() + if err != nil { + return errors.E(op, err) + } + return nil +} + +func (s *Plugin) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("boltdb Has") + s.log.Debug("boltdb HAS method called", "args", keys) + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + m := make(map[string]bool, len(keys)) + + // this is readable transaction + err := s.DB.View(func(tx *bolt.Tx) error { + // Get retrieves the value for a key in the bucket. + // Returns a nil value if the key does not exist or if the key is a nested bucket. + // The returned value is only valid for the life of the transaction. + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + b := tx.Bucket(s.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + exist := b.Get([]byte(keys[i])) + if exist != nil { + m[keys[i]] = true + } + } + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + s.log.Debug("boltdb HAS method finished") + return m, nil +} + +// Get retrieves the value for a key in the bucket. +// Returns a nil value if the key does not exist or if the key is a nested bucket. +// The returned value is only valid for the life of the transaction. +func (s *Plugin) Get(key string) ([]byte, error) { + const op = errors.Op("boltdb Get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + var val []byte + err := s.DB.View(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + val = b.Get([]byte(key)) + + // try to decode values + if val != nil { + buf := bytes.NewReader(val) + decoder := gob.NewDecoder(buf) + + var i string + err := decoder.Decode(&i) + if err != nil { + // unsafe (w/o runes) convert + return errors.E(op, err) + } + + // set the value + val = []byte(i) + } + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + return val, nil +} + +func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("boltdb MGet") + // defence + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + err := s.DB.View(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + + buf := new(bytes.Buffer) + var out string + buf.Grow(100) + for i := range keys { + value := b.Get([]byte(keys[i])) + buf.Write(value) + // allocate enough space + dec := gob.NewDecoder(buf) + if value != nil { + err := dec.Decode(&out) + if err != nil { + return errors.E(op, err) + } + m[keys[i]] = out + buf.Reset() + out = "" + } + } + + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + return m, nil +} + +// Set puts the K/V to the bolt +func (s *Plugin) Set(items ...kv.Item) error { + const op = errors.Op("boltdb Set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + // start writable transaction + tx, err := s.DB.Begin(true) + if err != nil { + return errors.E(op, err) + } + defer func() { + err = tx.Commit() + if err != nil { + errRb := tx.Rollback() + if errRb != nil { + s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) + } + } + }() + + b := tx.Bucket(s.bucket) + // use access by index to avoid copying + for i := range items { + // performance note: pass a prepared bytes slice with initial cap + // we can't move buf and gob out of loop, because we need to clear both from data + // but gob will contain (w/o re-init) the past data + buf := bytes.Buffer{} + encoder := gob.NewEncoder(&buf) + if errors.Is(errors.EmptyItem, err) { + return errors.E(op, errors.EmptyItem) + } + + // Encode value + err = encoder.Encode(&items[i].Value) + if err != nil { + return errors.E(op, err) + } + // buf.Bytes will copy the underlying slice. Take a look in case of performance problems + err = b.Put([]byte(items[i].Key), buf.Bytes()) + if err != nil { + return errors.E(op, err) + } + + // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check + // we do not need mutex here, since we use sync.Map + if items[i].TTL != "" { + // check correctness of provided TTL + _, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return errors.E(op, err) + } + // Store key TTL in the separate map + s.gc.Store(items[i].Key, items[i].TTL) + } + + buf.Reset() + } + + return nil +} + +// Delete all keys from DB +func (s *Plugin) Delete(keys ...string) error { + const op = errors.Op("boltdb Delete") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for _, key := range keys { + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + // start writable transaction + tx, err := s.DB.Begin(true) + if err != nil { + return errors.E(op, err) + } + + defer func() { + err = tx.Commit() + if err != nil { + errRb := tx.Rollback() + if errRb != nil { + s.log.Error("during the commit, Rollback error occurred", "commit error", err, "rollback error", errRb) + } + } + }() + + b := tx.Bucket(s.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + + for _, key := range keys { + err = b.Delete([]byte(key)) + if err != nil { + return errors.E(op, err) + } + } + + return nil +} + +// MExpire sets the expiration time to the key +// If key already has the expiration time, it will be overwritten +func (s *Plugin) MExpire(items ...kv.Item) error { + const op = errors.Op("boltdb MExpire") + for i := range items { + if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // verify provided TTL + _, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return errors.E(op, err) + } + + s.gc.Store(items[i].Key, items[i].TTL) + } + return nil +} + +func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("boltdb TTL") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + for i := range keys { + if item, ok := s.gc.Load(keys[i]); ok { + // a little bit dangerous operation, but user can't store value other that kv.Item.TTL --> int64 + m[keys[i]] = item.(string) + } + } + return m, nil +} + +// Close the DB connection +func (s *Plugin) Close() error { + // stop the keys GC + s.stop <- struct{}{} + return s.DB.Close() +} + +// RPCService returns associated rpc service. +func (s *Plugin) RPC() interface{} { + return kv.NewRPCServer(s, s.log) +} + +// Name returns plugin name +func (s *Plugin) Name() string { + return PluginName +} + +// ========================= PRIVATE ================================= + +func (s *Plugin) gcPhase() { + t := time.NewTicker(s.timeout) + defer t.Stop() + for { + select { + case <-t.C: + // calculate current time before loop started to be fair + now := time.Now() + s.gc.Range(func(key, value interface{}) bool { + const op = errors.Op("gcPhase") + k := key.(string) + v, err := time.Parse(time.RFC3339, value.(string)) + if err != nil { + return false + } + + if now.After(v) { + // time expired + s.gc.Delete(k) + s.log.Debug("key deleted", "key", k) + err := s.DB.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + err := b.Delete([]byte(k)) + if err != nil { + return errors.E(op, err) + } + return nil + }) + if err != nil { + s.log.Error("error during the gc phase of update", "error", err) + // todo this error is ignored, it means, that timer still be active + // to prevent this, we need to invoke t.Stop() + return false + } + } + return true + }) + case <-s.stop: + return + } + } +} diff --git a/plugins/kv/boltdb/plugin_unit_test.go b/plugins/kv/boltdb/plugin_unit_test.go new file mode 100644 index 00000000..2459e493 --- /dev/null +++ b/plugins/kv/boltdb/plugin_unit_test.go @@ -0,0 +1,531 @@ +package boltdb + +import ( + "os" + "strconv" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/stretchr/testify/assert" + bolt "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +// NewBoltClient instantiate new BOLTDB client +// The parameters are: +// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created +// perm os.FileMode -- file permissions, for example 0777 +// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other +// bucket string -- name of the bucket to use, should be UTF-8 +func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) { + const op = errors.Op("newBoltClient") + db, err := bolt.Open(path, perm, options) + if err != nil { + return nil, errors.E(op, err) + } + + // bucket should be SET + if bucket == "" { + return nil, errors.E(op, errors.Str("bucket should be set")) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + _, err = tx.CreateBucketIfNotExists([]byte(bucket)) + if err != nil { + return errors.E(op, err) + } + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + // if TTL is not set, make it default + if ttl == 0 { + ttl = time.Minute + } + + l, _ := zap.NewDevelopment() + s := &Plugin{ + DB: db, + bucket: []byte(bucket), + stop: make(chan struct{}), + timeout: ttl, + gc: &sync.Map{}, + log: logger.NewZapAdapter(l), + } + + // start the TTL gc + go s.gcPhase() + + return s, nil +} + +func initStorage() kv.Storage { + storage, err := newBoltClient("rr.db", 0777, nil, "rr", time.Second) + if err != nil { + panic(err) + } + return storage +} + +func cleanup(t *testing.T, path string) { + err := os.RemoveAll(path) + if err != nil { + t.Fatal(err) + } +} + +func TestStorage_Has(t *testing.T) { + s := initStorage() + defer func() { + if err := s.Close(); err != nil { + panic(err) + } + cleanup(t, "rr.db") + }() + + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) +} + +func TestStorage_Has_Set_Has(t *testing.T) { + s := initStorage() + defer func() { + if err := s.Close(); err != nil { + panic(err) + } + cleanup(t, "rr.db") + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) +} + +func TestConcurrentReadWriteTransactions(t *testing.T) { + s := initStorage() + defer func() { + if err := s.Close(); err != nil { + panic(err) + } + cleanup(t, "rr.db") + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + wg := &sync.WaitGroup{} + wg.Add(3) + + m := &sync.RWMutex{} + // concurrently set the keys + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 1000; i++ { + m.Lock() + // set is writable transaction + // it should stop readable + assert.NoError(t, s.Set(kv.Item{ + Key: "key" + strconv.Itoa(i), + Value: "hello world" + strconv.Itoa(i), + TTL: "", + }, kv.Item{ + Key: "key2" + strconv.Itoa(i), + Value: "hello world" + strconv.Itoa(i), + TTL: "", + })) + m.Unlock() + } + }(s) + + // should be no errors + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 1000; i++ { + m.RLock() + v, err = s.Has("key") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + m.RUnlock() + } + }(s) + + // should be no errors + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 1000; i++ { + m.Lock() + err = s.Delete("key" + strconv.Itoa(i)) + assert.NoError(t, err) + m.Unlock() + } + }(s) + + wg.Wait() +} + +func TestStorage_Has_Set_MGet(t *testing.T) { + s := initStorage() + defer func() { + if err := s.Close(); err != nil { + panic(err) + } + cleanup(t, "rr.db") + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) +} + +func TestStorage_Has_Set_Get(t *testing.T) { + s := initStorage() + defer func() { + if err := s.Close(); err != nil { + panic(err) + } + cleanup(t, "rr.db") + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world2", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + res, err := s.Get("key") + assert.NoError(t, err) + + if string(res) != "hello world" { + t.Fatal("wrong value by key") + } +} + +func TestStorage_Set_Del_Get(t *testing.T) { + s := initStorage() + defer func() { + if err := s.Close(); err != nil { + panic(err) + } + cleanup(t, "rr.db") + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + // check that keys are present + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) + + assert.NoError(t, s.Delete("key", "key2")) + // check that keys are not present + res, err = s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 0) +} + +func TestStorage_Set_GetM(t *testing.T) { + s := initStorage() + defer func() { + if err := s.Close(); err != nil { + panic(err) + } + cleanup(t, "rr.db") + }() + + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) +} + +func TestNilAndWrongArgs(t *testing.T) { + s := initStorage() + defer func() { + if err := s.Close(); err != nil { + panic(err) + } + cleanup(t, "rr.db") + }() + + // check + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) + + _, err = s.Has("") + assert.Error(t, err) + + _, err = s.Get("") + assert.Error(t, err) + + _, err = s.Get(" ") + assert.Error(t, err) + + _, err = s.Get(" ") + assert.Error(t, err) + + _, err = s.MGet("key", "key2", "") + assert.Error(t, err) + + _, err = s.MGet("key", "key2", " ") + assert.Error(t, err) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + })) + + assert.Error(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "asdf", + })) + + _, err = s.Has("key") + assert.NoError(t, err) + + assert.Error(t, s.Set(kv.Item{})) + + err = s.Delete("") + assert.Error(t, err) + + err = s.Delete("key", "") + assert.Error(t, err) + + err = s.Delete("key", " ") + assert.Error(t, err) + + err = s.Delete("key") + assert.NoError(t, err) +} + +func TestStorage_MExpire_TTL(t *testing.T) { + s := initStorage() + defer func() { + if err := s.Close(); err != nil { + panic(err) + } + cleanup(t, "rr.db") + }() + + // ensure that storage is clean + v, err := s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + // set timeout to 5 sec + nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) + + i1 := kv.Item{ + Key: "key", + Value: "", + TTL: nowPlusFive, + } + i2 := kv.Item{ + Key: "key2", + Value: "", + TTL: nowPlusFive, + } + assert.NoError(t, s.MExpire(i1, i2)) + + time.Sleep(time.Second * 6) + + // ensure that storage is clean + v, err = s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) +} + +func TestStorage_SetExpire_TTL(t *testing.T) { + s := initStorage() + defer func() { + if err := s.Close(); err != nil { + panic(err) + } + cleanup(t, "rr.db") + }() + + // ensure that storage is clean + v, err := s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) + + // set timeout to 5 sec + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "value", + TTL: nowPlusFive, + }, + kv.Item{ + Key: "key2", + Value: "value", + TTL: nowPlusFive, + })) + + time.Sleep(time.Second * 2) + m, err := s.TTL("key", "key2") + assert.NoError(t, err) + + // remove a precision 4.02342342 -> 4 + keyTTL, err := strconv.Atoi(m["key"].(string)[0:1]) + if err != nil { + t.Fatal(err) + } + + // remove a precision 4.02342342 -> 4 + key2TTL, err := strconv.Atoi(m["key"].(string)[0:1]) + if err != nil { + t.Fatal(err) + } + + assert.True(t, keyTTL < 5) + assert.True(t, key2TTL < 5) + + time.Sleep(time.Second * 4) + + // ensure that storage is clean + v, err = s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) +} diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go new file mode 100644 index 00000000..c1367cdf --- /dev/null +++ b/plugins/kv/interface.go @@ -0,0 +1,41 @@ +package kv + +// Item represents general storage item +type Item struct { + // Key of item + Key string + // Value of item + Value string + // live until time provided by TTL in RFC 3339 format + TTL string +} + +// Storage represents single abstract storage. +type Storage interface { + // Has checks if value exists. + Has(keys ...string) (map[string]bool, error) + + // Get loads value content into a byte slice. + Get(key string) ([]byte, error) + + // MGet loads content of multiple values + // Returns the map with existing keys and associated values + MGet(keys ...string) (map[string]interface{}, error) + + // Set used to upload item to KV with TTL + // 0 value in TTL means no TTL + Set(items ...Item) error + + // MExpire sets the TTL for multiply keys + MExpire(items ...Item) error + + // TTL return the rest time to live for provided keys + // Not supported for the memcached and boltdb + TTL(keys ...string) (map[string]interface{}, error) + + // Delete one or multiple keys. + Delete(keys ...string) error + + // Close closes the storage and underlying resources. + Close() error +} diff --git a/plugins/kv/memcached/config.go b/plugins/kv/memcached/config.go new file mode 100644 index 00000000..62f29ef2 --- /dev/null +++ b/plugins/kv/memcached/config.go @@ -0,0 +1,10 @@ +package memcached + +type Config struct { + // Addr is url for memcached, 11211 port is used by default + Addr []string +} + +func (s *Config) InitDefaults() { + s.Addr = []string{"localhost:11211"} // default url for memcached // init logger +} diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/memcached/plugin.go new file mode 100644 index 00000000..f5111c04 --- /dev/null +++ b/plugins/kv/memcached/plugin.go @@ -0,0 +1,252 @@ +package memcached + +import ( + "strings" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const PluginName = "memcached" + +var EmptyItem = kv.Item{} + +type Plugin struct { + // config + cfg *Config + // logger + log logger.Logger + // memcached client + client *memcache.Client +} + +// NewMemcachedClient returns a memcache client using the provided server(s) +// with equal weight. If a server is listed multiple times, +// it gets a proportional amount of weight. +func NewMemcachedClient(url string) kv.Storage { + m := memcache.New(url) + return &Plugin{ + client: m, + } +} + +func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + const op = errors.Op("memcached init") + s.cfg = &Config{} + s.cfg.InitDefaults() + err := cfg.UnmarshalKey(PluginName, &s.cfg) + if err != nil { + return errors.E(op, err) + } + s.log = log + return nil +} + +func (s *Plugin) Serve() chan error { + errCh := make(chan error, 1) + s.client = memcache.New(s.cfg.Addr...) + return errCh +} + +// Memcached has no stop/close or smt similar to close the connection +func (s *Plugin) Stop() error { + return nil +} + +// RPCService returns associated rpc service. +func (s *Plugin) RPC() interface{} { + return kv.NewRPCServer(s, s.log) +} + +// Name returns plugin user-friendly name +func (s *Plugin) Name() string { + return PluginName +} + +// Has checks the key for existence +func (s Plugin) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("memcached Has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool, len(keys)) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + exist, err := s.client.Get(keys[i]) + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err != nil && err != memcache.ErrCacheMiss { + return nil, err + } + if exist != nil { + m[keys[i]] = true + } + } + return m, nil +} + +// Get gets the item for the given key. ErrCacheMiss is returned for a +// memcache cache miss. The key must be at most 250 bytes in length. +func (s Plugin) Get(key string) ([]byte, error) { + const op = errors.Op("memcached Get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + data, err := s.client.Get(key) + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err != nil && err != memcache.ErrCacheMiss { + return nil, err + } + if data != nil { + // return the value by the key + return data.Value, nil + } + // data is nil by some reason and error also nil + return nil, nil +} + +// return map with key -- string +// and map value as value -- []byte +func (s Plugin) MGet(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("memcached MGet") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + for i := range keys { + // Here also MultiGet + data, err := s.client.Get(keys[i]) + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err != nil && err != memcache.ErrCacheMiss { + return nil, err + } + if data != nil { + m[keys[i]] = data.Value + } + } + + return m, nil +} + +// Set sets the KV pairs. Keys should be 250 bytes maximum +// TTL: +// Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (s Plugin) Set(items ...kv.Item) error { + const op = errors.Op("memcached Set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + if items[i] == EmptyItem { + return errors.E(op, errors.EmptyItem) + } + + // pre-allocate item + memcachedItem := &memcache.Item{ + Key: items[i].Key, + // unsafe convert + Value: []byte(items[i].Value), + Flags: 0, + } + + // add additional TTL in case of TTL isn't empty + if items[i].TTL != "" { + // verify the TTL + t, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return err + } + memcachedItem.Expiration = int32(t.Unix()) + } + + err := s.client.Set(memcachedItem) + if err != nil { + return err + } + } + + return nil +} + +// Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (s Plugin) MExpire(items ...kv.Item) error { + const op = errors.Op("memcached MExpire") + for i := range items { + if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // verify provided TTL + t, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return err + } + + // Touch updates the expiry for the given key. The seconds parameter is either + // a Unix timestamp or, if seconds is less than 1 month, the number of seconds + // into the future at which time the item will expire. Zero means the item has + // no expiration time. ErrCacheMiss is returned if the key is not in the cache. + // The key must be at most 250 bytes in length. + err = s.client.Touch(items[i].Key, int32(t.Unix())) + if err != nil { + return err + } + } + return nil +} + +// return time in seconds (int32) for a given keys +func (s Plugin) TTL(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("memcached HTTLas") + return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) +} + +func (s Plugin) Delete(keys ...string) error { + const op = errors.Op("memcached Has") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + err := s.client.Delete(keys[i]) + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err != nil && err != memcache.ErrCacheMiss { + return err + } + } + return nil +} + +func (s Plugin) Close() error { + return nil +} diff --git a/plugins/kv/memcached/storage_test.go b/plugins/kv/memcached/storage_test.go new file mode 100644 index 00000000..3d37748b --- /dev/null +++ b/plugins/kv/memcached/storage_test.go @@ -0,0 +1,432 @@ +package memcached + +import ( + "strconv" + "sync" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/stretchr/testify/assert" +) + +func initStorage() kv.Storage { + return NewMemcachedClient("localhost:11211") +} + +func cleanup(t *testing.T, s kv.Storage, keys ...string) { + err := s.Delete(keys...) + if err != nil { + t.Fatalf("error during cleanup: %s", err.Error()) + } +} + +func TestStorage_Has(t *testing.T) { + s := initStorage() + + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) +} + +func TestStorage_Has_Set_Has(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) +} + +func TestStorage_Has_Set_MGet(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) +} + +func TestStorage_Has_Set_Get(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + res, err := s.Get("key") + assert.NoError(t, err) + + if string(res) != "hello world" { + t.Fatal("wrong value by key") + } +} + +func TestStorage_Set_Del_Get(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + // check that keys are present + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) + + assert.NoError(t, s.Delete("key", "key2")) + // check that keys are not present + res, err = s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 0) +} + +func TestStorage_Set_GetM(t *testing.T) { + s := initStorage() + + defer func() { + cleanup(t, s, "key", "key2") + + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) +} + +func TestStorage_MExpire_TTL(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + // ensure that storage is clean + v, err := s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + // set timeout to 5 sec + nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) + + i1 := kv.Item{ + Key: "key", + Value: "", + TTL: nowPlusFive, + } + i2 := kv.Item{ + Key: "key2", + Value: "", + TTL: nowPlusFive, + } + assert.NoError(t, s.MExpire(i1, i2)) + + time.Sleep(time.Second * 6) + + // ensure that storage is clean + v, err = s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) +} + +func TestNilAndWrongArgs(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key") + if err := s.Close(); err != nil { + panic(err) + } + }() + + // check + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) + + _, err = s.Has("") + assert.Error(t, err) + + _, err = s.Get("") + assert.Error(t, err) + + _, err = s.Get(" ") + assert.Error(t, err) + + _, err = s.Get(" ") + assert.Error(t, err) + + _, err = s.MGet("key", "key2", "") + assert.Error(t, err) + + _, err = s.MGet("key", "key2", " ") + assert.Error(t, err) + + assert.Error(t, s.Set(kv.Item{})) + + err = s.Delete("") + assert.Error(t, err) + + err = s.Delete("key", "") + assert.Error(t, err) + + err = s.Delete("key", " ") + assert.Error(t, err) + + err = s.Delete("key") + assert.NoError(t, err) +} + +func TestStorage_SetExpire_TTL(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + // ensure that storage is clean + v, err := s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) + + // set timeout to 5 sec + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "value", + TTL: nowPlusFive, + }, + kv.Item{ + Key: "key2", + Value: "value", + TTL: nowPlusFive, + })) + + time.Sleep(time.Second * 6) + + // ensure that storage is clean + v, err = s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) +} + +func TestConcurrentReadWriteTransactions(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + wg := &sync.WaitGroup{} + wg.Add(3) + + m := &sync.RWMutex{} + // concurrently set the keys + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 1000; i++ { + m.Lock() + // set is writable transaction + // it should stop readable + assert.NoError(t, s.Set(kv.Item{ + Key: "key" + strconv.Itoa(i), + Value: "hello world" + strconv.Itoa(i), + TTL: "", + }, kv.Item{ + Key: "key2" + strconv.Itoa(i), + Value: "hello world" + strconv.Itoa(i), + TTL: "", + })) + m.Unlock() + } + }(s) + + // should be no errors + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 1000; i++ { + m.RLock() + v, err = s.Has("key") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + m.RUnlock() + } + }(s) + + // should be no errors + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 1000; i++ { + m.Lock() + err = s.Delete("key" + strconv.Itoa(i)) + assert.NoError(t, err) + m.Unlock() + } + }(s) + + wg.Wait() +} diff --git a/plugins/kv/memory/config.go b/plugins/kv/memory/config.go new file mode 100644 index 00000000..0816f734 --- /dev/null +++ b/plugins/kv/memory/config.go @@ -0,0 +1,15 @@ +package memory + +// Config is default config for the in-memory driver +type Config struct { + // Enabled or disabled (true or false) + Enabled bool + // Interval for the check + Interval int +} + +// InitDefaults by default driver is turned off +func (c *Config) InitDefaults() { + c.Enabled = false + c.Interval = 60 // seconds +} diff --git a/plugins/kv/memory/plugin.go b/plugins/kv/memory/plugin.go new file mode 100644 index 00000000..d2d3721b --- /dev/null +++ b/plugins/kv/memory/plugin.go @@ -0,0 +1,262 @@ +package memory + +import ( + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// PluginName is user friendly name for the plugin +const PluginName = "memory" + +type Plugin struct { + // heap is user map for the key-value pairs + heap sync.Map + stop chan struct{} + + log logger.Logger + cfg *Config +} + +func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("in-memory storage init") + s.cfg = &Config{} + s.cfg.InitDefaults() + + err := cfg.UnmarshalKey(PluginName, &s.cfg) + if err != nil { + return errors.E(op, err) + } + s.log = log + + s.stop = make(chan struct{}, 1) + return nil +} + +func (s *Plugin) Serve() chan error { + errCh := make(chan error, 1) + // start in-memory gc for kv + go s.gc() + + return errCh +} + +func (s *Plugin) Stop() error { + const op = errors.Op("in-memory storage stop") + err := s.Close() + if err != nil { + return errors.E(op, err) + } + return nil +} + +func (s *Plugin) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("in-memory storage Has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + if _, ok := s.heap.Load(keys[i]); ok { + m[keys[i]] = true + } + } + + return m, nil +} + +func (s *Plugin) Get(key string) ([]byte, error) { + const op = errors.Op("in-memory storage Get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + if data, exist := s.heap.Load(key); exist { + // here might be a panic + // but data only could be a string, see Set function + return []byte(data.(kv.Item).Value), nil + } + return nil, nil +} + +func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("in-memory storage MGet") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + for i := range keys { + if value, ok := s.heap.Load(keys[i]); ok { + m[keys[i]] = value.(kv.Item).Value + } + } + + return m, nil +} + +func (s *Plugin) Set(items ...kv.Item) error { + const op = errors.Op("in-memory storage Set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + // TTL is set + if items[i].TTL != "" { + // check the TTL in the item + _, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return err + } + } + + s.heap.Store(items[i].Key, items[i]) + } + return nil +} + +// MExpire sets the expiration time to the key +// If key already has the expiration time, it will be overwritten +func (s *Plugin) MExpire(items ...kv.Item) error { + const op = errors.Op("in-memory storage MExpire") + for i := range items { + if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // if key exist, overwrite it value + if pItem, ok := s.heap.Load(items[i].Key); ok { + // check that time is correct + _, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return errors.E(op, err) + } + tmp := pItem.(kv.Item) + // guess that t is in the future + // in memory is just FOR TESTING PURPOSES + // LOGIC ISN'T IDEAL + s.heap.Store(items[i].Key, kv.Item{ + Key: items[i].Key, + Value: tmp.Value, + TTL: items[i].TTL, + }) + } + } + + return nil +} + +func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("in-memory storage TTL") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + for i := range keys { + if item, ok := s.heap.Load(keys[i]); ok { + m[keys[i]] = item.(kv.Item).TTL + } + } + return m, nil +} + +func (s *Plugin) Delete(keys ...string) error { + const op = errors.Op("in-memory storage Delete") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + s.heap.Delete(keys[i]) + } + return nil +} + +// Close clears the in-memory storage +func (s *Plugin) Close() error { + s.stop <- struct{}{} + return nil +} + +// RPCService returns associated rpc service. +func (s *Plugin) RPC() interface{} { + return kv.NewRPCServer(s, s.log) +} + +// Name returns plugin user-friendly name +func (s *Plugin) Name() string { + return PluginName +} + +// ================================== PRIVATE ====================================== + +func (s *Plugin) gc() { + // TODO check + ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) + for { + select { + case <-s.stop: + ticker.Stop() + return + case now := <-ticker.C: + // check every second + s.heap.Range(func(key, value interface{}) bool { + v := value.(kv.Item) + if v.TTL == "" { + return true + } + + t, err := time.Parse(time.RFC3339, v.TTL) + if err != nil { + return false + } + + if now.After(t) { + s.log.Debug("key deleted", "key", key) + s.heap.Delete(key) + } + return true + }) + } + } +} diff --git a/plugins/kv/memory/storage_test.go b/plugins/kv/memory/storage_test.go new file mode 100644 index 00000000..d3b24860 --- /dev/null +++ b/plugins/kv/memory/storage_test.go @@ -0,0 +1,473 @@ +package memory + +import ( + "strconv" + "sync" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func initStorage() kv.Storage { + p := &Plugin{ + stop: make(chan struct{}), + } + p.cfg = &Config{ + Enabled: true, + Interval: 1, + } + + l, _ := zap.NewDevelopment() + p.log = logger.NewZapAdapter(l) + + go p.gc() + + return p +} + +func cleanup(t *testing.T, s kv.Storage, keys ...string) { + err := s.Delete(keys...) + if err != nil { + t.Fatalf("error during cleanup: %s", err.Error()) + } +} + +func TestStorage_Has(t *testing.T) { + s := initStorage() + + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) +} + +func TestStorage_Has_Set_Has(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "value", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "value", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) +} + +func TestStorage_Has_Set_MGet(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "value", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "value", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) +} + +func TestStorage_Has_Set_Get(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "value", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "value", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + res, err := s.Get("key") + assert.NoError(t, err) + + if string(res) != "value" { + t.Fatal("wrong value by key") + } +} + +func TestStorage_Set_Del_Get(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "value", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "value", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + // check that keys are present + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) + + assert.NoError(t, s.Delete("key", "key2")) + // check that keys are not presents -eo state,uid,pid,ppid,rtprio,time,comm + res, err = s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 0) +} + +func TestStorage_Set_GetM(t *testing.T) { + s := initStorage() + + defer func() { + cleanup(t, s, "key", "key2") + + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "value", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "value", + TTL: "", + })) + + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) +} + +func TestStorage_MExpire_TTL(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + // ensure that storage is clean + v, err := s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + // set timeout to 5 sec + nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) + + i1 := kv.Item{ + Key: "key", + Value: "", + TTL: nowPlusFive, + } + i2 := kv.Item{ + Key: "key2", + Value: "", + TTL: nowPlusFive, + } + assert.NoError(t, s.MExpire(i1, i2)) + + time.Sleep(time.Second * 6) + + // ensure that storage is clean + v, err = s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) +} + +func TestNilAndWrongArgs(t *testing.T) { + s := initStorage() + defer func() { + if err := s.Close(); err != nil { + panic(err) + } + }() + + // check + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) + + _, err = s.Has("") + assert.Error(t, err) + + _, err = s.Get("") + assert.Error(t, err) + + _, err = s.Get(" ") + assert.Error(t, err) + + _, err = s.Get(" ") + assert.Error(t, err) + + _, err = s.MGet("key", "key2", "") + assert.Error(t, err) + + _, err = s.MGet("key", "key2", " ") + assert.Error(t, err) + + assert.NoError(t, s.Set(kv.Item{})) + _, err = s.Has("key") + assert.NoError(t, err) + + err = s.Delete("") + assert.Error(t, err) + + err = s.Delete("key", "") + assert.Error(t, err) + + err = s.Delete("key", " ") + assert.Error(t, err) + + err = s.Delete("key") + assert.NoError(t, err) +} + +func TestStorage_SetExpire_TTL(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + // ensure that storage is clean + v, err := s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) + + // set timeout to 5 sec + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "value", + TTL: nowPlusFive, + }, + kv.Item{ + Key: "key2", + Value: "value", + TTL: nowPlusFive, + })) + + time.Sleep(time.Second * 2) + m, err := s.TTL("key", "key2") + assert.NoError(t, err) + + // remove a precision 4.02342342 -> 4 + keyTTL, err := strconv.Atoi(m["key"].(string)[0:1]) + if err != nil { + t.Fatal(err) + } + + // remove a precision 4.02342342 -> 4 + key2TTL, err := strconv.Atoi(m["key"].(string)[0:1]) + if err != nil { + t.Fatal(err) + } + + assert.True(t, keyTTL < 5) + assert.True(t, key2TTL < 5) + + time.Sleep(time.Second * 4) + + // ensure that storage is clean + v, err = s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) +} + +func TestConcurrentReadWriteTransactions(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + wg := &sync.WaitGroup{} + wg.Add(3) + + m := &sync.RWMutex{} + // concurrently set the keys + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 1000; i++ { + m.Lock() + // set is writable transaction + // it should stop readable + assert.NoError(t, s.Set(kv.Item{ + Key: "key" + strconv.Itoa(i), + Value: "hello world" + strconv.Itoa(i), + TTL: "", + }, kv.Item{ + Key: "key2" + strconv.Itoa(i), + Value: "hello world" + strconv.Itoa(i), + TTL: "", + })) + m.Unlock() + } + }(s) + + // should be no errors + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 1000; i++ { + m.RLock() + v, err = s.Has("key") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + m.RUnlock() + } + }(s) + + // should be no errors + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 1000; i++ { + m.Lock() + err = s.Delete("key" + strconv.Itoa(i)) + assert.NoError(t, err) + m.Unlock() + } + }(s) + + wg.Wait() +} diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go new file mode 100644 index 00000000..751f0d12 --- /dev/null +++ b/plugins/kv/rpc.go @@ -0,0 +1,110 @@ +package kv + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// Wrapper for the plugin +type RPCServer struct { + // svc is a plugin implementing Storage interface + svc Storage + // Logger + log logger.Logger +} + +// NewRPCServer construct RPC server for the particular plugin +func NewRPCServer(srv Storage, log logger.Logger) *RPCServer { + return &RPCServer{ + svc: srv, + log: log, + } +} + +// data Data +func (r *RPCServer) Has(in []string, res *map[string]bool) error { + const op = errors.Op("rpc server Has") + ret, err := r.svc.Has(in...) + if err != nil { + return errors.E(op, err) + } + + // update the value in the pointer + *res = ret + return nil +} + +// in SetData +func (r *RPCServer) Set(in []Item, ok *bool) error { + const op = errors.Op("rpc server Set") + + err := r.svc.Set(in...) + if err != nil { + return errors.E(op, err) + } + + *ok = true + return nil +} + +// in Data +func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error { + const op = errors.Op("rpc server MGet") + ret, err := r.svc.MGet(in...) + if err != nil { + return errors.E(op, err) + } + + // update return value + *res = ret + return nil +} + +// in Data +func (r *RPCServer) MExpire(in []Item, ok *bool) error { + const op = errors.Op("rpc server MExpire") + + err := r.svc.MExpire(in...) + if err != nil { + return errors.E(op, err) + } + + *ok = true + return nil +} + +// in Data +func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error { + const op = errors.Op("rpc server TTL") + + ret, err := r.svc.TTL(in...) + if err != nil { + return errors.E(op, err) + } + + *res = ret + return nil +} + +// in Data +func (r *RPCServer) Delete(in []string, ok *bool) error { + const op = errors.Op("rpc server Delete") + err := r.svc.Delete(in...) + if err != nil { + return errors.E(op, err) + } + *ok = true + return nil +} + +// in string, storages +func (r *RPCServer) Close(storage string, ok *bool) error { + const op = errors.Op("rpc server Close") + err := r.svc.Close() + if err != nil { + return errors.E(op, err) + } + *ok = true + + return nil +} diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go index 452e03a3..eb1b61b2 100644 --- a/plugins/reload/plugin.go +++ b/plugins/reload/plugin.go @@ -57,7 +57,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res resetter.Res return nil } } - return errors.E(op, errors.Skip) + return errors.E(op, errors.SkipFile) }, Files: make(map[string]os.FileInfo), Ignored: ignored, diff --git a/plugins/reload/watcher.go b/plugins/reload/watcher.go index c232f16f..08c85af9 100644 --- a/plugins/reload/watcher.go +++ b/plugins/reload/watcher.go @@ -179,7 +179,7 @@ outer: // if filename does not contain pattern --> ignore that file if w.watcherConfigs[serviceName].FilePatterns != nil && w.watcherConfigs[serviceName].FilterHooks != nil { err = w.watcherConfigs[serviceName].FilterHooks(fileInfoList[i].Name(), w.watcherConfigs[serviceName].FilePatterns) - if errors.Is(errors.Skip, err) { + if errors.Is(errors.SkipFile, err) { continue outer } } @@ -293,7 +293,7 @@ func (w *Watcher) retrieveFilesRecursive(serviceName, root string) (map[string]o // if filename does not contain pattern --> ignore that file err = w.watcherConfigs[serviceName].FilterHooks(info.Name(), w.watcherConfigs[serviceName].FilePatterns) - if errors.Is(errors.Skip, err) { + if errors.Is(errors.SkipFile, err) { return nil } diff --git a/plugins/server/config.go b/plugins/server/config.go index 4bef3c5f..2bf30e70 100644 --- a/plugins/server/config.go +++ b/plugins/server/config.go @@ -28,6 +28,7 @@ type Config struct { RelayTimeout time.Duration } +// InitDefaults for the server config func (cfg *Config) InitDefaults() { if cfg.Relay == "" { cfg.Relay = "pipes" diff --git a/plugins/server/interface.go b/plugins/server/interface.go index 98345694..a2d8b92b 100644 --- a/plugins/server/interface.go +++ b/plugins/server/interface.go @@ -10,6 +10,7 @@ import ( poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" ) +// Env variables type alias type Env map[string]string // Server creates workers for the application. diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 5d1f26d3..8a843723 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -21,6 +21,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) +// PluginName for the server const PluginName = "server" // Plugin manages worker @@ -53,11 +54,13 @@ func (server *Plugin) Name() string { return PluginName } +// Serve (Start) server plugin (just a mock here to satisfy interface) func (server *Plugin) Serve() chan error { errCh := make(chan error, 1) return errCh } +// Stop used to close chosen in config factory func (server *Plugin) Stop() error { if server.factory == nil { return nil |