diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/http/config/http.go | 2 | ||||
-rw-r--r-- | plugins/http/plugin.go | 35 | ||||
-rw-r--r-- | plugins/kv/memcached/plugin.go | 40 | ||||
-rw-r--r-- | plugins/server/plugin.go | 12 |
4 files changed, 51 insertions, 38 deletions
diff --git a/plugins/http/config/http.go b/plugins/http/config/http.go index bfbc1af6..022476e2 100644 --- a/plugins/http/config/http.go +++ b/plugins/http/config/http.go @@ -73,7 +73,7 @@ func (c *HTTP) InitDefaults() error { c.Pool = &poolImpl.Config{ Debug: false, NumWorkers: uint64(runtime.NumCPU()), - MaxJobs: 1000, + MaxJobs: 0, AllocateTimeout: time.Second * 60, DestroyTimeout: time.Second * 60, Supervisor: nil, diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 3672f5ac..bab03edc 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -102,19 +102,6 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } s.cfg.Env[RR_MODE] = "http" - - s.pool, err = server.NewWorkerPool(context.Background(), pool.Config{ - Debug: s.cfg.Pool.Debug, - NumWorkers: s.cfg.Pool.NumWorkers, - MaxJobs: s.cfg.Pool.MaxJobs, - AllocateTimeout: s.cfg.Pool.AllocateTimeout, - DestroyTimeout: s.cfg.Pool.DestroyTimeout, - Supervisor: s.cfg.Pool.Supervisor, - }, s.cfg.Env, s.logCallback) - if err != nil { - return errors.E(op, err) - } - s.server = server return nil @@ -141,6 +128,19 @@ func (s *Plugin) Serve() chan error { errCh := make(chan error, 2) var err error + s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{ + Debug: s.cfg.Pool.Debug, + NumWorkers: s.cfg.Pool.NumWorkers, + MaxJobs: s.cfg.Pool.MaxJobs, + AllocateTimeout: s.cfg.Pool.AllocateTimeout, + DestroyTimeout: s.cfg.Pool.DestroyTimeout, + Supervisor: s.cfg.Pool.Supervisor, + }, s.cfg.Env, s.logCallback) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + s.handler, err = NewHandler( s.cfg.MaxRequestSize, *s.cfg.Uploads, @@ -303,12 +303,9 @@ func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Workers returns associated pool workers func (s *Plugin) Workers() []worker.BaseProcess { - workers := s.pool.Workers() - baseWorkers := make([]worker.BaseProcess, 0, len(workers)) - for i := 0; i < len(workers); i++ { - baseWorkers = append(baseWorkers, worker.FromSync(workers[i].(*worker.SyncWorkerImpl))) - } - return baseWorkers + s.Lock() + defer s.Unlock() + return s.pool.Workers() } // Name returns endure.Named interface implementation diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/memcached/plugin.go index 181b8a49..b8392f9e 100644 --- a/plugins/kv/memcached/plugin.go +++ b/plugins/kv/memcached/plugin.go @@ -84,9 +84,13 @@ func (s *Plugin) Has(keys ...string) (map[string]bool, error) { return nil, errors.E(op, errors.EmptyKey) } exist, err := s.client.Get(keys[i]) - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil && err != memcache.ErrCacheMiss { - return nil, err + + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) } if exist != nil { m[keys[i]] = true @@ -105,9 +109,12 @@ func (s *Plugin) Get(key string) ([]byte, error) { return nil, errors.E(op, errors.EmptyKey) } data, err := s.client.Get(key) - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil && err != memcache.ErrCacheMiss { - return nil, err + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + return nil, nil + } + return nil, errors.E(op, err) } if data != nil { // return the value by the key @@ -137,9 +144,12 @@ func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { for i := range keys { // Here also MultiGet data, err := s.client.Get(keys[i]) - // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil && err != memcache.ErrCacheMiss { - return nil, err + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) } if data != nil { m[keys[i]] = data.Value @@ -205,7 +215,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error { // verify provided TTL t, err := time.Parse(time.RFC3339, items[i].TTL) if err != nil { - return err + return errors.E(op, err) } // Touch updates the expiry for the given key. The seconds parameter is either @@ -215,7 +225,7 @@ func (s *Plugin) MExpire(items ...kv.Item) error { // The key must be at most 250 bytes in length. err = s.client.Touch(items[i].Key, int32(t.Unix())) if err != nil { - return err + return errors.E(op, err) } } return nil @@ -244,8 +254,12 @@ func (s *Plugin) Delete(keys ...string) error { for i := range keys { err := s.client.Delete(keys[i]) // ErrCacheMiss means that a Get failed because the item wasn't present. - if err != nil && err != memcache.ErrCacheMiss { - return err + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return errors.E(op, err) } } return nil diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 99d93d19..95e593b8 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -49,11 +49,6 @@ func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error { server.cfg.InitDefaults() server.log = log - server.factory, err = server.initFactory() - if err != nil { - return errors.E(err) - } - return nil } @@ -64,7 +59,14 @@ func (server *Plugin) Name() string { // Serve (Start) server plugin (just a mock here to satisfy interface) func (server *Plugin) Serve() chan error { + const op = errors.Op("server_plugin_serve") errCh := make(chan error, 1) + var err error + server.factory, err = server.initFactory() + if err != nil { + errCh <- errors.E(op, err) + return errCh + } return errCh } |