summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-11 14:06:10 +0300
committerGitHub <[email protected]>2021-02-11 14:06:10 +0300
commit8f6cafdc0948a5ea13bf9a811b576aa4b3ef7e4a (patch)
tree92727c3ff8087597bac65eee2c26c9484c98be7f /plugins
parent7978c59f0ed286912bfcaec81b76e54532b1a9bf (diff)
parent509abc76a0f7b88678de67843ca79d9052ad8dd6 (diff)
Merge pull request #530 from spiral/release_stabilizationv2.0.0-RC.1
stabilization(RC): rc stabilization
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/config/http.go2
-rw-r--r--plugins/http/plugin.go35
-rw-r--r--plugins/kv/memcached/plugin.go40
-rw-r--r--plugins/server/plugin.go12
4 files changed, 51 insertions, 38 deletions
diff --git a/plugins/http/config/http.go b/plugins/http/config/http.go
index bfbc1af6..022476e2 100644
--- a/plugins/http/config/http.go
+++ b/plugins/http/config/http.go
@@ -73,7 +73,7 @@ func (c *HTTP) InitDefaults() error {
c.Pool = &poolImpl.Config{
Debug: false,
NumWorkers: uint64(runtime.NumCPU()),
- MaxJobs: 1000,
+ MaxJobs: 0,
AllocateTimeout: time.Second * 60,
DestroyTimeout: time.Second * 60,
Supervisor: nil,
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 3672f5ac..bab03edc 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -102,19 +102,6 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
s.cfg.Env[RR_MODE] = "http"
-
- s.pool, err = server.NewWorkerPool(context.Background(), pool.Config{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, s.cfg.Env, s.logCallback)
- if err != nil {
- return errors.E(op, err)
- }
-
s.server = server
return nil
@@ -141,6 +128,19 @@ func (s *Plugin) Serve() chan error {
errCh := make(chan error, 2)
var err error
+ s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{
+ Debug: s.cfg.Pool.Debug,
+ NumWorkers: s.cfg.Pool.NumWorkers,
+ MaxJobs: s.cfg.Pool.MaxJobs,
+ AllocateTimeout: s.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: s.cfg.Pool.DestroyTimeout,
+ Supervisor: s.cfg.Pool.Supervisor,
+ }, s.cfg.Env, s.logCallback)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
s.handler, err = NewHandler(
s.cfg.MaxRequestSize,
*s.cfg.Uploads,
@@ -303,12 +303,9 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Workers returns associated pool workers
func (s *Plugin) Workers() []worker.BaseProcess {
- workers := s.pool.Workers()
- baseWorkers := make([]worker.BaseProcess, 0, len(workers))
- for i := 0; i < len(workers); i++ {
- baseWorkers = append(baseWorkers, worker.FromSync(workers[i].(*worker.SyncWorkerImpl)))
- }
- return baseWorkers
+ s.Lock()
+ defer s.Unlock()
+ return s.pool.Workers()
}
// Name returns endure.Named interface implementation
diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/memcached/plugin.go
index 181b8a49..b8392f9e 100644
--- a/plugins/kv/memcached/plugin.go
+++ b/plugins/kv/memcached/plugin.go
@@ -84,9 +84,13 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
return nil, errors.E(op, errors.EmptyKey)
}
exist, err := s.client.Get(keys[i])
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err != nil && err != memcache.ErrCacheMiss {
- return nil, err
+
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return nil, errors.E(op, err)
}
if exist != nil {
m[keys[i]] = true
@@ -105,9 +109,12 @@ func (s *Plugin) Get(key string) ([]byte, error) {
return nil, errors.E(op, errors.EmptyKey)
}
data, err := s.client.Get(key)
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err != nil && err != memcache.ErrCacheMiss {
- return nil, err
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ return nil, nil
+ }
+ return nil, errors.E(op, err)
}
if data != nil {
// return the value by the key
@@ -137,9 +144,12 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
for i := range keys {
// Here also MultiGet
data, err := s.client.Get(keys[i])
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err != nil && err != memcache.ErrCacheMiss {
- return nil, err
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return nil, errors.E(op, err)
}
if data != nil {
m[keys[i]] = data.Value
@@ -205,7 +215,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error {
// verify provided TTL
t, err := time.Parse(time.RFC3339, items[i].TTL)
if err != nil {
- return err
+ return errors.E(op, err)
}
// Touch updates the expiry for the given key. The seconds parameter is either
@@ -215,7 +225,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error {
// The key must be at most 250 bytes in length.
err = s.client.Touch(items[i].Key, int32(t.Unix()))
if err != nil {
- return err
+ return errors.E(op, err)
}
}
return nil
@@ -244,8 +254,12 @@ func (s *Plugin) Delete(keys ...string) error {
for i := range keys {
err := s.client.Delete(keys[i])
// ErrCacheMiss means that a Get failed because the item wasn't present.
- if err != nil && err != memcache.ErrCacheMiss {
- return err
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return errors.E(op, err)
}
}
return nil
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 99d93d19..95e593b8 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -49,11 +49,6 @@ func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
server.cfg.InitDefaults()
server.log = log
- server.factory, err = server.initFactory()
- if err != nil {
- return errors.E(err)
- }
-
return nil
}
@@ -64,7 +59,14 @@ func (server *Plugin) Name() string {
// Serve (Start) server plugin (just a mock here to satisfy interface)
func (server *Plugin) Serve() chan error {
+ const op = errors.Op("server_plugin_serve")
errCh := make(chan error, 1)
+ var err error
+ server.factory, err = server.initFactory()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
return errCh
}