diff options
Diffstat (limited to 'plugins/http/plugin.go')
-rw-r--r-- | plugins/http/plugin.go | 412 |
1 files changed, 0 insertions, 412 deletions
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go deleted file mode 100644 index dc887f87..00000000 --- a/plugins/http/plugin.go +++ /dev/null @@ -1,412 +0,0 @@ -package http - -import ( - "context" - "fmt" - "log" - "net/http" - "sync" - - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/state/process" - "github.com/spiral/roadrunner/v2/pkg/worker" - handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/http/attributes" - httpConfig "github.com/spiral/roadrunner/v2/plugins/http/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/status" - "golang.org/x/net/http2" - "golang.org/x/net/http2/h2c" -) - -const ( - // PluginName declares plugin name. - PluginName = "http" - - // RrMode RR_HTTP env variable key (internal) if the HTTP presents - RrMode = "RR_MODE" - - HTTPSScheme = "https" -) - -// Middleware interface -type Middleware interface { - Middleware(f http.Handler) http.Handler -} - -type middleware map[string]Middleware - -// Plugin manages pool, http servers. The main http plugin structure -type Plugin struct { - sync.RWMutex - - // plugins - server server.Server - log logger.Logger - // stdlog passed to the http/https/fcgi servers to log their internal messages - stdLog *log.Logger - - // http configuration - cfg *httpConfig.HTTP `mapstructure:"http"` - - // middlewares to chain - mdwr middleware - - // Pool which attached to all servers - pool pool.Pool - - // servers RR handler - handler *handler.Handler - - // metrics - workersExporter *workersExporter - requestsExporter *requestsExporter - - // servers - http *http.Server - https *http.Server - fcgi *http.Server -} - -// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of -// misconfiguration. Services must not be used without proper configuration pushed first. -func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server server.Server) error { - const op = errors.Op("http_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, err) - } - - err = p.cfg.InitDefaults() - if err != nil { - return errors.E(op, err) - } - - // rr logger (via plugin) - p.log = rrLogger - // use time and date in UTC format - p.stdLog = log.New(logger.NewStdAdapter(p.log), "http_plugin: ", log.Ldate|log.Ltime|log.LUTC) - - p.mdwr = make(map[string]Middleware) - - if !p.cfg.EnableHTTP() && !p.cfg.EnableTLS() && !p.cfg.EnableFCGI() { - return errors.E(op, errors.Disabled) - } - - // init if nil - if p.cfg.Env == nil { - p.cfg.Env = make(map[string]string) - } - - // initialize workersExporter - p.workersExporter = newWorkersExporter() - // initialize requests exporter - p.requestsExporter = newRequestsExporter() - - p.cfg.Env[RrMode] = "http" - p.server = server - - return nil -} - -func (p *Plugin) logCallback(event interface{}) { - if ev, ok := event.(handler.ResponseEvent); ok { - p.log.Debug(fmt.Sprintf("%d %s %s", ev.Response.Status, ev.Request.Method, ev.Request.URI), - "remote", ev.Request.RemoteAddr, - "elapsed", ev.Elapsed().String(), - ) - } -} - -// Serve serves the svc. -func (p *Plugin) Serve() chan error { - errCh := make(chan error, 2) - // run whole process in the goroutine - go func() { - // protect http initialization - p.Lock() - p.serve(errCh) - p.Unlock() - }() - - return errCh -} - -func (p *Plugin) serve(errCh chan error) { - var err error - const op = errors.Op("http_plugin_serve") - p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{ - Debug: p.cfg.Pool.Debug, - NumWorkers: p.cfg.Pool.NumWorkers, - MaxJobs: p.cfg.Pool.MaxJobs, - AllocateTimeout: p.cfg.Pool.AllocateTimeout, - DestroyTimeout: p.cfg.Pool.DestroyTimeout, - Supervisor: p.cfg.Pool.Supervisor, - }, p.cfg.Env, p.logCallback) - if err != nil { - errCh <- errors.E(op, err) - return - } - - p.handler, err = handler.NewHandler( - p.cfg.MaxRequestSize, - p.cfg.InternalErrorCode, - *p.cfg.Uploads, - p.cfg.Cidrs, - p.pool, - ) - if err != nil { - errCh <- errors.E(op, err) - return - } - - p.handler.AddListener(p.logCallback, p.metricsCallback) - - if p.cfg.EnableHTTP() { - if p.cfg.EnableH2C() { - p.http = &http.Server{ - Handler: h2c.NewHandler(p, &http2.Server{}), - ErrorLog: p.stdLog, - } - } else { - p.http = &http.Server{ - Handler: p, - ErrorLog: p.stdLog, - } - } - } - - if p.cfg.EnableTLS() { - p.https = p.initSSL() - if p.cfg.SSLConfig.RootCA != "" { - err = p.appendRootCa() - if err != nil { - errCh <- errors.E(op, err) - return - } - } - - // if HTTP2Config not nil - if p.cfg.HTTP2Config != nil { - if err := p.initHTTP2(); err != nil { - errCh <- errors.E(op, err) - return - } - } - } - - if p.cfg.EnableFCGI() { - p.fcgi = &http.Server{Handler: p, ErrorLog: p.stdLog} - } - - // start http, https and fcgi servers if requested in the config - go func() { - p.serveHTTP(errCh) - }() - - go func() { - p.serveHTTPS(errCh) - }() - - go func() { - p.serveFCGI(errCh) - }() -} - -// Stop stops the http. -func (p *Plugin) Stop() error { - p.Lock() - defer p.Unlock() - - if p.fcgi != nil { - err := p.fcgi.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - p.log.Error("fcgi shutdown", "error", err) - } - } - - if p.https != nil { - err := p.https.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - p.log.Error("https shutdown", "error", err) - } - } - - if p.http != nil { - err := p.http.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - p.log.Error("http shutdown", "error", err) - } - } - - // check for safety - if p.pool != nil { - p.pool.Destroy(context.Background()) - } - - return nil -} - -// ServeHTTP handles connection using set of middleware and pool PSR-7 server. -func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { - defer func() { - err := r.Body.Close() - if err != nil { - p.log.Error("body close", "error", err) - } - }() - if headerContainsUpgrade(r) { - http.Error(w, "server does not support upgrade header", http.StatusInternalServerError) - return - } - - if p.https != nil && r.TLS == nil && p.cfg.SSLConfig.Redirect { - p.redirect(w, r) - return - } - - if p.https != nil && r.TLS != nil { - w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload") - } - - r = attributes.Init(r) - // protect the case, when user sendEvent Reset and we are replacing handler with pool - p.RLock() - p.handler.ServeHTTP(w, r) - p.RUnlock() -} - -// Workers returns slice with the process states for the workers -func (p *Plugin) Workers() []*process.State { - p.RLock() - defer p.RUnlock() - - workers := p.workers() - - ps := make([]*process.State, 0, len(workers)) - for i := 0; i < len(workers); i++ { - state, err := process.WorkerProcessState(workers[i]) - if err != nil { - return nil - } - ps = append(ps, state) - } - - return ps -} - -// internal -func (p *Plugin) workers() []worker.BaseProcess { - return p.pool.Workers() -} - -// Name returns endure.Named interface implementation -func (p *Plugin) Name() string { - return PluginName -} - -// Reset destroys the old pool and replaces it with new one, waiting for old pool to die -func (p *Plugin) Reset() error { - p.Lock() - defer p.Unlock() - const op = errors.Op("http_plugin_reset") - p.log.Info("HTTP plugin got restart request. Restarting...") - p.pool.Destroy(context.Background()) - p.pool = nil - - var err error - p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{ - Debug: p.cfg.Pool.Debug, - NumWorkers: p.cfg.Pool.NumWorkers, - MaxJobs: p.cfg.Pool.MaxJobs, - AllocateTimeout: p.cfg.Pool.AllocateTimeout, - DestroyTimeout: p.cfg.Pool.DestroyTimeout, - Supervisor: p.cfg.Pool.Supervisor, - }, p.cfg.Env, p.logCallback) - if err != nil { - return errors.E(op, err) - } - - p.log.Info("HTTP workers Pool successfully restarted") - - p.handler, err = handler.NewHandler( - p.cfg.MaxRequestSize, - p.cfg.InternalErrorCode, - *p.cfg.Uploads, - p.cfg.Cidrs, - p.pool, - ) - - if err != nil { - return errors.E(op, err) - } - - p.log.Info("HTTP handler listeners successfully re-added") - p.handler.AddListener(p.logCallback, p.metricsCallback) - - p.log.Info("HTTP plugin successfully restarted") - return nil -} - -// Collects collecting http middlewares -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.AddMiddleware, - } -} - -// AddMiddleware is base requirement for the middleware (name and Middleware) -func (p *Plugin) AddMiddleware(name endure.Named, m Middleware) { - p.mdwr[name.Name()] = m -} - -// Status return status of the particular plugin -func (p *Plugin) Status() status.Status { - p.RLock() - defer p.RUnlock() - - workers := p.workers() - for i := 0; i < len(workers); i++ { - if workers[i].State().IsActive() { - return status.Status{ - Code: http.StatusOK, - } - } - } - // if there are no workers, threat this as error - return status.Status{ - Code: http.StatusServiceUnavailable, - } -} - -// Ready return readiness status of the particular plugin -func (p *Plugin) Ready() status.Status { - p.RLock() - defer p.RUnlock() - - workers := p.workers() - for i := 0; i < len(workers); i++ { - // If state of the worker is ready (at least 1) - // we assume, that plugin's worker pool is ready - if workers[i].State().Value() == worker.StateReady { - return status.Status{ - Code: http.StatusOK, - } - } - } - // if there are no workers, threat this as no content error - return status.Status{ - Code: http.StatusServiceUnavailable, - } -} - -// Available interface implementation -func (p *Plugin) Available() {} |