summaryrefslogtreecommitdiff
path: root/plugins/http/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/http/plugin.go')
-rw-r--r--plugins/http/plugin.go224
1 files changed, 112 insertions, 112 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index d5324246..8bcffb63 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -71,47 +71,47 @@ type Plugin struct {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error {
+func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error {
const op = errors.Op("http_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, err)
}
- err = s.cfg.InitDefaults()
+ err = p.cfg.InitDefaults()
if err != nil {
return errors.E(op, err)
}
// rr logger (via plugin)
- s.log = rrLogger
+ p.log = rrLogger
// use time and date in UTC format
- s.stdLog = log.New(logger.NewStdAdapter(s.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC)
+ p.stdLog = log.New(logger.NewStdAdapter(p.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC)
- s.mdwr = make(map[string]Middleware)
+ p.mdwr = make(map[string]Middleware)
- if !s.cfg.EnableHTTP() && !s.cfg.EnableTLS() && !s.cfg.EnableFCGI() {
+ if !p.cfg.EnableHTTP() && !p.cfg.EnableTLS() && !p.cfg.EnableFCGI() {
return errors.E(op, errors.Disabled)
}
// init if nil
- if s.cfg.Env == nil {
- s.cfg.Env = make(map[string]string)
+ if p.cfg.Env == nil {
+ p.cfg.Env = make(map[string]string)
}
- s.cfg.Env[RrMode] = "http"
- s.server = server
+ p.cfg.Env[RrMode] = "http"
+ p.server = server
return nil
}
-func (s *Plugin) logCallback(event interface{}) {
+func (p *Plugin) logCallback(event interface{}) {
if ev, ok := event.(handler.ResponseEvent); ok {
- s.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI),
+ p.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI),
"remote", ev.Request.RemoteAddr,
"elapsed", ev.Elapsed().String(),
)
@@ -119,60 +119,60 @@ func (s *Plugin) logCallback(event interface{}) {
}
// Serve serves the svc.
-func (s *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error {
errCh := make(chan error, 2)
// run whole process in the goroutine
go func() {
// protect http initialization
- s.Lock()
- s.serve(errCh)
- s.Unlock()
+ p.Lock()
+ p.serve(errCh)
+ p.Unlock()
}()
return errCh
}
-func (s *Plugin) serve(errCh chan error) {
+func (p *Plugin) serve(errCh chan error) {
var err error
const op = errors.Op("http_plugin_serve")
- s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, s.cfg.Env, s.logCallback)
+ p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ Debug: p.cfg.Pool.Debug,
+ NumWorkers: p.cfg.Pool.NumWorkers,
+ MaxJobs: p.cfg.Pool.MaxJobs,
+ AllocateTimeout: p.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: p.cfg.Pool.DestroyTimeout,
+ Supervisor: p.cfg.Pool.Supervisor,
+ }, p.cfg.Env, p.logCallback)
if err != nil {
errCh <- errors.E(op, err)
return
}
- s.handler, err = handler.NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.Cidrs,
- s.pool,
+ p.handler, err = handler.NewHandler(
+ p.cfg.MaxRequestSize,
+ *p.cfg.Uploads,
+ p.cfg.Cidrs,
+ p.pool,
)
if err != nil {
errCh <- errors.E(op, err)
return
}
- s.handler.AddListener(s.logCallback)
+ p.handler.AddListener(p.logCallback)
- if s.cfg.EnableHTTP() {
- if s.cfg.EnableH2C() {
- s.http = &http.Server{Handler: h2c.NewHandler(s, &http2.Server{}), ErrorLog: s.stdLog}
+ if p.cfg.EnableHTTP() {
+ if p.cfg.EnableH2C() {
+ p.http = &http.Server{Handler: h2c.NewHandler(p, &http2.Server{}), ErrorLog: p.stdLog}
} else {
- s.http = &http.Server{Handler: s, ErrorLog: s.stdLog}
+ p.http = &http.Server{Handler: p, ErrorLog: p.stdLog}
}
}
- if s.cfg.EnableTLS() {
- s.https = s.initSSL()
- if s.cfg.SSLConfig.RootCA != "" {
- err = s.appendRootCa()
+ if p.cfg.EnableTLS() {
+ p.https = p.initSSL()
+ if p.cfg.SSLConfig.RootCA != "" {
+ err = p.appendRootCa()
if err != nil {
errCh <- errors.E(op, err)
return
@@ -180,102 +180,102 @@ func (s *Plugin) serve(errCh chan error) {
}
// if HTTP2Config not nil
- if s.cfg.HTTP2Config != nil {
- if err := s.initHTTP2(); err != nil {
+ if p.cfg.HTTP2Config != nil {
+ if err := p.initHTTP2(); err != nil {
errCh <- errors.E(op, err)
return
}
}
}
- if s.cfg.EnableFCGI() {
- s.fcgi = &http.Server{Handler: s, ErrorLog: s.stdLog}
+ if p.cfg.EnableFCGI() {
+ p.fcgi = &http.Server{Handler: p, ErrorLog: p.stdLog}
}
// start http, https and fcgi servers if requested in the config
go func() {
- s.serveHTTP(errCh)
+ p.serveHTTP(errCh)
}()
go func() {
- s.serveHTTPS(errCh)
+ p.serveHTTPS(errCh)
}()
go func() {
- s.serveFCGI(errCh)
+ p.serveFCGI(errCh)
}()
}
// Stop stops the http.
-func (s *Plugin) Stop() error {
- s.Lock()
- defer s.Unlock()
+func (p *Plugin) Stop() error {
+ p.Lock()
+ defer p.Unlock()
var err error
- if s.fcgi != nil {
- err = s.fcgi.Shutdown(context.Background())
+ if p.fcgi != nil {
+ err = p.fcgi.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the fcgi server", "error", err)
+ p.log.Error("error shutting down the fcgi server", "error", err)
// write error and try to stop other transport
err = multierror.Append(err)
}
}
- if s.https != nil {
- err = s.https.Shutdown(context.Background())
+ if p.https != nil {
+ err = p.https.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the https server", "error", err)
+ p.log.Error("error shutting down the https server", "error", err)
// write error and try to stop other transport
err = multierror.Append(err)
}
}
- if s.http != nil {
- err = s.http.Shutdown(context.Background())
+ if p.http != nil {
+ err = p.http.Shutdown(context.Background())
if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the http server", "error", err)
+ p.log.Error("error shutting down the http server", "error", err)
// write error and try to stop other transport
err = multierror.Append(err)
}
}
// check for safety
- if s.pool != nil {
- s.pool.Destroy(context.Background())
+ if p.pool != nil {
+ p.pool.Destroy(context.Background())
}
return err
}
// ServeHTTP handles connection using set of middleware and pool PSR-7 server.
-func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if headerContainsUpgrade(r) {
http.Error(w, "server does not support upgrade header", http.StatusInternalServerError)
return
}
- if s.https != nil && r.TLS == nil && s.cfg.SSLConfig.Redirect {
- s.redirect(w, r)
+ if p.https != nil && r.TLS == nil && p.cfg.SSLConfig.Redirect {
+ p.redirect(w, r)
return
}
- if s.https != nil && r.TLS != nil {
+ if p.https != nil && r.TLS != nil {
w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload")
}
r = attributes.Init(r)
// protect the case, when user sendEvent Reset and we are replacing handler with pool
- s.RLock()
- s.handler.ServeHTTP(w, r)
- s.RUnlock()
+ p.RLock()
+ p.handler.ServeHTTP(w, r)
+ p.RUnlock()
}
// Workers returns slice with the process states for the workers
-func (s *Plugin) Workers() []process.State {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Workers() []process.State {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
ps := make([]process.State, 0, len(workers))
for i := 0; i < len(workers); i++ {
@@ -290,74 +290,74 @@ func (s *Plugin) Workers() []process.State {
}
// internal
-func (s *Plugin) workers() []worker.BaseProcess {
- return s.pool.Workers()
+func (p *Plugin) workers() []worker.BaseProcess {
+ return p.pool.Workers()
}
// Name returns endure.Named interface implementation
-func (s *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// Reset destroys the old pool and replaces it with new one, waiting for old pool to die
-func (s *Plugin) Reset() error {
- s.Lock()
- defer s.Unlock()
+func (p *Plugin) Reset() error {
+ p.Lock()
+ defer p.Unlock()
const op = errors.Op("http_plugin_reset")
- s.log.Info("HTTP plugin got restart request. Restarting...")
- s.pool.Destroy(context.Background())
- s.pool = nil
+ p.log.Info("HTTP plugin got restart request. Restarting...")
+ p.pool.Destroy(context.Background())
+ p.pool = nil
var err error
- s.pool, err = s.server.NewWorkerPool(context.Background(), pool.Config{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, s.cfg.Env, s.logCallback)
+ p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ Debug: p.cfg.Pool.Debug,
+ NumWorkers: p.cfg.Pool.NumWorkers,
+ MaxJobs: p.cfg.Pool.MaxJobs,
+ AllocateTimeout: p.cfg.Pool.AllocateTimeout,
+ DestroyTimeout: p.cfg.Pool.DestroyTimeout,
+ Supervisor: p.cfg.Pool.Supervisor,
+ }, p.cfg.Env, p.logCallback)
if err != nil {
return errors.E(op, err)
}
- s.log.Info("HTTP workers Pool successfully restarted")
+ p.log.Info("HTTP workers Pool successfully restarted")
- s.handler, err = handler.NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.Cidrs,
- s.pool,
+ p.handler, err = handler.NewHandler(
+ p.cfg.MaxRequestSize,
+ *p.cfg.Uploads,
+ p.cfg.Cidrs,
+ p.pool,
)
if err != nil {
return errors.E(op, err)
}
- s.log.Info("HTTP handler listeners successfully re-added")
- s.handler.AddListener(s.logCallback)
+ p.log.Info("HTTP handler listeners successfully re-added")
+ p.handler.AddListener(p.logCallback)
- s.log.Info("HTTP plugin successfully restarted")
+ p.log.Info("HTTP plugin successfully restarted")
return nil
}
// Collects collecting http middlewares
-func (s *Plugin) Collects() []interface{} {
+func (p *Plugin) Collects() []interface{} {
return []interface{}{
- s.AddMiddleware,
+ p.AddMiddleware,
}
}
// AddMiddleware is base requirement for the middleware (name and Middleware)
-func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) {
- s.mdwr[name.Name()] = m
+func (p *Plugin) AddMiddleware(name endure.Named, m Middleware) {
+ p.mdwr[name.Name()] = m
}
// Status return status of the particular plugin
-func (s *Plugin) Status() status.Status {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Status() status.Status {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
for i := 0; i < len(workers); i++ {
if workers[i].State().IsActive() {
return status.Status{
@@ -372,14 +372,14 @@ func (s *Plugin) Status() status.Status {
}
// Ready return readiness status of the particular plugin
-func (s *Plugin) Ready() status.Status {
- s.RLock()
- defer s.RUnlock()
+func (p *Plugin) Ready() status.Status {
+ p.RLock()
+ defer p.RUnlock()
- workers := s.workers()
+ workers := p.workers()
for i := 0; i < len(workers); i++ {
// If state of the worker is ready (at least 1)
- // we assume, that plugin's worker pool is ready
+ // we assume, that plugin'p worker pool is ready
if workers[i].State().Value() == worker.StateReady {
return status.Status{
Code: http.StatusOK,
@@ -393,4 +393,4 @@ func (s *Plugin) Ready() status.Status {
}
// Available interface implementation
-func (s *Plugin) Available() {}
+func (p *Plugin) Available() {}