diff options
Diffstat (limited to 'plugins/http/plugin.go')
-rw-r--r-- | plugins/http/plugin.go | 224 |
1 files changed, 112 insertions, 112 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index d5324246..8bcffb63 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -71,47 +71,47 @@ type Plugin struct { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error { +func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error { const op = errors.Op("http_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &s.cfg) + err := cfg.UnmarshalKey(PluginName, &p.cfg) if err != nil { return errors.E(op, err) } - err = s.cfg.InitDefaults() + err = p.cfg.InitDefaults() if err != nil { return errors.E(op, err) } // rr logger (via plugin) - s.log = rrLogger + p.log = rrLogger // use time and date in UTC format - s.stdLog = log.New(logger.NewStdAdapter(s.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC) + p.stdLog = log.New(logger.NewStdAdapter(p.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC) - s.mdwr = make(map[string]Middleware) + p.mdwr = make(map[string]Middleware) - if !s.cfg.EnableHTTP() && !s.cfg.EnableTLS() && !s.cfg.EnableFCGI() { + if !p.cfg.EnableHTTP() && !p.cfg.EnableTLS() && !p.cfg.EnableFCGI() { return errors.E(op, errors.Disabled) } // init if nil - if s.cfg.Env == nil { - s.cfg.Env = make(map[string]string) + if p.cfg.Env == nil { + p.cfg.Env = make(map[string]string) } - s.cfg.Env[RrMode] = "http" - s.server = server + p.cfg.Env[RrMode] = "http" + p.server = server return nil } -func (s *Plugin) logCallback(event interface{}) { +func (p *Plugin) logCallback(event interface{}) { if ev, ok := event.(handler.ResponseEvent); ok { - s.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI), + p.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI), "remote", ev.Request.RemoteAddr, "elapsed", ev.Elapsed().String(), ) @@ -119,60 +119,60 @@ func (s *Plugin) logCallback(event interface{}) { } // Serve serves the svc. -func (s *Plugin) Serve() chan error { +func (p *Plugin) Serve() chan error { errCh := make(chan error, 2) // run whole process in the goroutine go func() { // protect http initialization - s.Lock() - s.serve(errCh) - s.Unlock() + p.Lock() + p.serve(errCh) + p.Unlock() }() return errCh } -func (s *Plugin) serve(errCh chan error) { +func (p *Plugin) serve(errCh chan error) { var err error const op = errors.Op("http_plugin_serve") - s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{ - Debug: s.cfg.Pool.Debug, - NumWorkers: s.cfg.Pool.NumWorkers, - MaxJobs: s.cfg.Pool.MaxJobs, - AllocateTimeout: s.cfg.Pool.AllocateTimeout, - DestroyTimeout: s.cfg.Pool.DestroyTimeout, - Supervisor: s.cfg.Pool.Supervisor, - }, s.cfg.Env, s.logCallback) + p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + Debug: p.cfg.Pool.Debug, + NumWorkers: p.cfg.Pool.NumWorkers, + MaxJobs: p.cfg.Pool.MaxJobs, + AllocateTimeout: p.cfg.Pool.AllocateTimeout, + DestroyTimeout: p.cfg.Pool.DestroyTimeout, + Supervisor: p.cfg.Pool.Supervisor, + }, p.cfg.Env, p.logCallback) if err != nil { errCh <- errors.E(op, err) return } - s.handler, err = handler.NewHandler( - s.cfg.MaxRequestSize, - *s.cfg.Uploads, - s.cfg.Cidrs, - s.pool, + p.handler, err = handler.NewHandler( + p.cfg.MaxRequestSize, + *p.cfg.Uploads, + p.cfg.Cidrs, + p.pool, ) if err != nil { errCh <- errors.E(op, err) return } - s.handler.AddListener(s.logCallback) + p.handler.AddListener(p.logCallback) - if s.cfg.EnableHTTP() { - if s.cfg.EnableH2C() { - s.http = &http.Server{Handler: h2c.NewHandler(s, &http2.Server{}), ErrorLog: s.stdLog} + if p.cfg.EnableHTTP() { + if p.cfg.EnableH2C() { + p.http = &http.Server{Handler: h2c.NewHandler(p, &http2.Server{}), ErrorLog: p.stdLog} } else { - s.http = &http.Server{Handler: s, ErrorLog: s.stdLog} + p.http = &http.Server{Handler: p, ErrorLog: p.stdLog} } } - if s.cfg.EnableTLS() { - s.https = s.initSSL() - if s.cfg.SSLConfig.RootCA != "" { - err = s.appendRootCa() + if p.cfg.EnableTLS() { + p.https = p.initSSL() + if p.cfg.SSLConfig.RootCA != "" { + err = p.appendRootCa() if err != nil { errCh <- errors.E(op, err) return @@ -180,102 +180,102 @@ func (s *Plugin) serve(errCh chan error) { } // if HTTP2Config not nil - if s.cfg.HTTP2Config != nil { - if err := s.initHTTP2(); err != nil { + if p.cfg.HTTP2Config != nil { + if err := p.initHTTP2(); err != nil { errCh <- errors.E(op, err) return } } } - if s.cfg.EnableFCGI() { - s.fcgi = &http.Server{Handler: s, ErrorLog: s.stdLog} + if p.cfg.EnableFCGI() { + p.fcgi = &http.Server{Handler: p, ErrorLog: p.stdLog} } // start http, https and fcgi servers if requested in the config go func() { - s.serveHTTP(errCh) + p.serveHTTP(errCh) }() go func() { - s.serveHTTPS(errCh) + p.serveHTTPS(errCh) }() go func() { - s.serveFCGI(errCh) + p.serveFCGI(errCh) }() } // Stop stops the http. -func (s *Plugin) Stop() error { - s.Lock() - defer s.Unlock() +func (p *Plugin) Stop() error { + p.Lock() + defer p.Unlock() var err error - if s.fcgi != nil { - err = s.fcgi.Shutdown(context.Background()) + if p.fcgi != nil { + err = p.fcgi.Shutdown(context.Background()) if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the fcgi server", "error", err) + p.log.Error("error shutting down the fcgi server", "error", err) // write error and try to stop other transport err = multierror.Append(err) } } - if s.https != nil { - err = s.https.Shutdown(context.Background()) + if p.https != nil { + err = p.https.Shutdown(context.Background()) if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the https server", "error", err) + p.log.Error("error shutting down the https server", "error", err) // write error and try to stop other transport err = multierror.Append(err) } } - if s.http != nil { - err = s.http.Shutdown(context.Background()) + if p.http != nil { + err = p.http.Shutdown(context.Background()) if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the http server", "error", err) + p.log.Error("error shutting down the http server", "error", err) // write error and try to stop other transport err = multierror.Append(err) } } // check for safety - if s.pool != nil { - s.pool.Destroy(context.Background()) + if p.pool != nil { + p.pool.Destroy(context.Background()) } return err } // ServeHTTP handles connection using set of middleware and pool PSR-7 server. -func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { if headerContainsUpgrade(r) { http.Error(w, "server does not support upgrade header", http.StatusInternalServerError) return } - if s.https != nil && r.TLS == nil && s.cfg.SSLConfig.Redirect { - s.redirect(w, r) + if p.https != nil && r.TLS == nil && p.cfg.SSLConfig.Redirect { + p.redirect(w, r) return } - if s.https != nil && r.TLS != nil { + if p.https != nil && r.TLS != nil { w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload") } r = attributes.Init(r) // protect the case, when user sendEvent Reset and we are replacing handler with pool - s.RLock() - s.handler.ServeHTTP(w, r) - s.RUnlock() + p.RLock() + p.handler.ServeHTTP(w, r) + p.RUnlock() } // Workers returns slice with the process states for the workers -func (s *Plugin) Workers() []process.State { - s.RLock() - defer s.RUnlock() +func (p *Plugin) Workers() []process.State { + p.RLock() + defer p.RUnlock() - workers := s.workers() + workers := p.workers() ps := make([]process.State, 0, len(workers)) for i := 0; i < len(workers); i++ { @@ -290,74 +290,74 @@ func (s *Plugin) Workers() []process.State { } // internal -func (s *Plugin) workers() []worker.BaseProcess { - return s.pool.Workers() +func (p *Plugin) workers() []worker.BaseProcess { + return p.pool.Workers() } // Name returns endure.Named interface implementation -func (s *Plugin) Name() string { +func (p *Plugin) Name() string { return PluginName } // Reset destroys the old pool and replaces it with new one, waiting for old pool to die -func (s *Plugin) Reset() error { - s.Lock() - defer s.Unlock() +func (p *Plugin) Reset() error { + p.Lock() + defer p.Unlock() const op = errors.Op("http_plugin_reset") - s.log.Info("HTTP plugin got restart request. Restarting...") - s.pool.Destroy(context.Background()) - s.pool = nil + p.log.Info("HTTP plugin got restart request. Restarting...") + p.pool.Destroy(context.Background()) + p.pool = nil var err error - s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{ - Debug: s.cfg.Pool.Debug, - NumWorkers: s.cfg.Pool.NumWorkers, - MaxJobs: s.cfg.Pool.MaxJobs, - AllocateTimeout: s.cfg.Pool.AllocateTimeout, - DestroyTimeout: s.cfg.Pool.DestroyTimeout, - Supervisor: s.cfg.Pool.Supervisor, - }, s.cfg.Env, s.logCallback) + p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{ + Debug: p.cfg.Pool.Debug, + NumWorkers: p.cfg.Pool.NumWorkers, + MaxJobs: p.cfg.Pool.MaxJobs, + AllocateTimeout: p.cfg.Pool.AllocateTimeout, + DestroyTimeout: p.cfg.Pool.DestroyTimeout, + Supervisor: p.cfg.Pool.Supervisor, + }, p.cfg.Env, p.logCallback) if err != nil { return errors.E(op, err) } - s.log.Info("HTTP workers Pool successfully restarted") + p.log.Info("HTTP workers Pool successfully restarted") - s.handler, err = handler.NewHandler( - s.cfg.MaxRequestSize, - *s.cfg.Uploads, - s.cfg.Cidrs, - s.pool, + p.handler, err = handler.NewHandler( + p.cfg.MaxRequestSize, + *p.cfg.Uploads, + p.cfg.Cidrs, + p.pool, ) if err != nil { return errors.E(op, err) } - s.log.Info("HTTP handler listeners successfully re-added") - s.handler.AddListener(s.logCallback) + p.log.Info("HTTP handler listeners successfully re-added") + p.handler.AddListener(p.logCallback) - s.log.Info("HTTP plugin successfully restarted") + p.log.Info("HTTP plugin successfully restarted") return nil } // Collects collecting http middlewares -func (s *Plugin) Collects() []interface{} { +func (p *Plugin) Collects() []interface{} { return []interface{}{ - s.AddMiddleware, + p.AddMiddleware, } } // AddMiddleware is base requirement for the middleware (name and Middleware) -func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) { - s.mdwr[name.Name()] = m +func (p *Plugin) AddMiddleware(name endure.Named, m Middleware) { + p.mdwr[name.Name()] = m } // Status return status of the particular plugin -func (s *Plugin) Status() status.Status { - s.RLock() - defer s.RUnlock() +func (p *Plugin) Status() status.Status { + p.RLock() + defer p.RUnlock() - workers := s.workers() + workers := p.workers() for i := 0; i < len(workers); i++ { if workers[i].State().IsActive() { return status.Status{ @@ -372,14 +372,14 @@ func (s *Plugin) Status() status.Status { } // Ready return readiness status of the particular plugin -func (s *Plugin) Ready() status.Status { - s.RLock() - defer s.RUnlock() +func (p *Plugin) Ready() status.Status { + p.RLock() + defer p.RUnlock() - workers := s.workers() + workers := p.workers() for i := 0; i < len(workers); i++ { // If state of the worker is ready (at least 1) - // we assume, that plugin's worker pool is ready + // we assume, that plugin'p worker pool is ready if workers[i].State().Value() == worker.StateReady { return status.Status{ Code: http.StatusOK, @@ -393,4 +393,4 @@ func (s *Plugin) Ready() status.Status { } // Available interface implementation -func (s *Plugin) Available() {} +func (p *Plugin) Available() {} |