diff options
Diffstat (limited to 'plugins/http/worker_handler')
-rw-r--r-- | plugins/http/worker_handler/constants.go | 8 | ||||
-rw-r--r-- | plugins/http/worker_handler/errors.go | 25 | ||||
-rw-r--r-- | plugins/http/worker_handler/errors_windows.go | 27 | ||||
-rw-r--r-- | plugins/http/worker_handler/handler.go | 217 | ||||
-rw-r--r-- | plugins/http/worker_handler/parse.go | 149 | ||||
-rw-r--r-- | plugins/http/worker_handler/request.go | 187 | ||||
-rw-r--r-- | plugins/http/worker_handler/response.go | 105 | ||||
-rw-r--r-- | plugins/http/worker_handler/uploads.go | 159 |
8 files changed, 877 insertions, 0 deletions
diff --git a/plugins/http/worker_handler/constants.go b/plugins/http/worker_handler/constants.go new file mode 100644 index 00000000..3355d9c2 --- /dev/null +++ b/plugins/http/worker_handler/constants.go @@ -0,0 +1,8 @@ +package handler + +import "net/http" + +var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push") + +// TrailerHeaderKey http header key +var TrailerHeaderKey = http.CanonicalHeaderKey("trailer") diff --git a/plugins/http/worker_handler/errors.go b/plugins/http/worker_handler/errors.go new file mode 100644 index 00000000..5fa8e64e --- /dev/null +++ b/plugins/http/worker_handler/errors.go @@ -0,0 +1,25 @@ +// +build !windows + +package handler + +import ( + "errors" + "net" + "os" + "syscall" +) + +// Broken pipe +var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer") + +// handleWriteError just check if error was caused by aborted connection on linux +func handleWriteError(err error) error { + if netErr, ok2 := err.(*net.OpError); ok2 { + if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { + if errors.Is(syscallErr.Err, syscall.EPIPE) { + return errEPIPE + } + } + } + return err +} diff --git a/plugins/http/worker_handler/errors_windows.go b/plugins/http/worker_handler/errors_windows.go new file mode 100644 index 00000000..390cc7d1 --- /dev/null +++ b/plugins/http/worker_handler/errors_windows.go @@ -0,0 +1,27 @@ +// +build windows + +package handler + +import ( + "errors" + "net" + "os" + "syscall" +) + +//Software caused connection abort. +//An established connection was aborted by the software in your host computer, +//possibly due to a data transmission time-out or protocol error. +var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer") + +// handleWriteError just check if error was caused by aborted connection on windows +func handleWriteError(err error) error { + if netErr, ok2 := err.(*net.OpError); ok2 { + if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { + if syscallErr.Err == syscall.WSAECONNABORTED { + return errEPIPE + } + } + } + return err +} diff --git a/plugins/http/worker_handler/handler.go b/plugins/http/worker_handler/handler.go new file mode 100644 index 00000000..be53fc12 --- /dev/null +++ b/plugins/http/worker_handler/handler.go @@ -0,0 +1,217 @@ +package handler + +import ( + "net" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/plugins/http/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// MB is 1024 bytes +const MB uint64 = 1024 * 1024 + +// ErrorEvent represents singular http error event. +type ErrorEvent struct { + // Request contains client request, must not be stored. + Request *http.Request + + // Error - associated error, if any. + Error error + + // event timings + start time.Time + elapsed time.Duration +} + +// Elapsed returns duration of the invocation. +func (e *ErrorEvent) Elapsed() time.Duration { + return e.elapsed +} + +// ResponseEvent represents singular http response event. +type ResponseEvent struct { + // Request contains client request, must not be stored. + Request *Request + + // Response contains service response. + Response *Response + + // event timings + start time.Time + elapsed time.Duration +} + +// Elapsed returns duration of the invocation. +func (e *ResponseEvent) Elapsed() time.Duration { + return e.elapsed +} + +// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, +// parsed files and query, payload will include parsed form dataTree (if any). +type Handler struct { + maxRequestSize uint64 + uploads config.Uploads + trusted config.Cidrs + log logger.Logger + pool pool.Pool + mul sync.Mutex + lsn events.Listener +} + +// NewHandler return handle interface implementation +func NewHandler(maxReqSize uint64, uploads config.Uploads, trusted config.Cidrs, pool pool.Pool) (*Handler, error) { + if pool == nil { + return nil, errors.E(errors.Str("pool should be initialized")) + } + return &Handler{ + maxRequestSize: maxReqSize * MB, + uploads: uploads, + pool: pool, + trusted: trusted, + }, nil +} + +// AddListener attaches handler event controller. +func (h *Handler) AddListener(l events.Listener) { + h.mul.Lock() + defer h.mul.Unlock() + + h.lsn = l +} + +// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + const op = errors.Op("http_plugin_serve_http") + start := time.Now() + + // validating request size + if h.maxRequestSize != 0 { + const op = errors.Op("http_handler_max_size") + if length := r.Header.Get("content-length"); length != "" { + // try to parse the value from the `content-length` header + size, err := strconv.ParseInt(length, 10, 64) + if err != nil { + // if got an error while parsing -> assign 500 code to the writer and return + http.Error(w, errors.E(op, err).Error(), 500) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("error while parsing value from the `content-length` header")), start: start, elapsed: time.Since(start)}) + return + } + + if size > int64(h.maxRequestSize) { + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("request body max size is exceeded")), start: start, elapsed: time.Since(start)}) + http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), 500) + return + } + } + } + + req, err := NewRequest(r, h.uploads) + if err != nil { + // if pipe is broken, there is no sense to write the header + // in this case we just report about error + if err == errEPIPE { + h.sendEvent(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) + return + } + + http.Error(w, errors.E(op, err).Error(), 500) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) + return + } + + // proxy IP resolution + h.resolveIP(req) + + req.Open(h.log) + defer req.Close(h.log) + + p, err := req.Payload() + if err != nil { + http.Error(w, errors.E(op, err).Error(), 500) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) + return + } + + rsp, err := h.pool.Exec(p) + if err != nil { + http.Error(w, errors.E(op, err).Error(), 500) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) + return + } + + resp, err := NewResponse(rsp) + if err != nil { + http.Error(w, errors.E(op, err).Error(), resp.Status) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) + return + } + + h.handleResponse(req, resp, start) + err = resp.Write(w) + if err != nil { + http.Error(w, errors.E(op, err).Error(), 500) + h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)}) + } +} + +// handleResponse triggers response event. +func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) { + h.sendEvent(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)}) +} + +// sendEvent invokes event handler if any. +func (h *Handler) sendEvent(event interface{}) { + if h.lsn != nil { + h.lsn(event) + } +} + +// get real ip passing multiple proxy +func (h *Handler) resolveIP(r *Request) { + if h.trusted.IsTrusted(r.RemoteAddr) == false { //nolint:gosimple + return + } + + if r.Header.Get("X-Forwarded-For") != "" { + ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",") + ipCount := len(ips) + + for i := ipCount - 1; i >= 0; i-- { + addr := strings.TrimSpace(ips[i]) + if net.ParseIP(addr) != nil { + r.RemoteAddr = addr + return + } + } + + return + } + + // The logic here is the following: + // In general case, we only expect X-Real-Ip header. If it exist, we get the IP address from header and set request Remote address + // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers + // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF. + // CF-Connecting-IP is an Enterprise feature and we check it last in order. + // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string + if r.Header.Get("X-Real-Ip") != "" { + r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip")) + return + } + + if r.Header.Get("True-Client-IP") != "" { + r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP")) + return + } + + if r.Header.Get("CF-Connecting-IP") != "" { + r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP")) + } +} diff --git a/plugins/http/worker_handler/parse.go b/plugins/http/worker_handler/parse.go new file mode 100644 index 00000000..2790da2a --- /dev/null +++ b/plugins/http/worker_handler/parse.go @@ -0,0 +1,149 @@ +package handler + +import ( + "net/http" + + "github.com/spiral/roadrunner/v2/plugins/http/config" +) + +// MaxLevel defines maximum tree depth for incoming request data and files. +const MaxLevel = 127 + +type dataTree map[string]interface{} +type fileTree map[string]interface{} + +// parseData parses incoming request body into data tree. +func parseData(r *http.Request) dataTree { + data := make(dataTree) + if r.PostForm != nil { + for k, v := range r.PostForm { + data.push(k, v) + } + } + + if r.MultipartForm != nil { + for k, v := range r.MultipartForm.Value { + data.push(k, v) + } + } + + return data +} + +// pushes value into data tree. +func (d dataTree) push(k string, v []string) { + keys := FetchIndexes(k) + if len(keys) <= MaxLevel { + d.mount(keys, v) + } +} + +// mount mounts data tree recursively. +func (d dataTree) mount(i []string, v []string) { + if len(i) == 1 { + // single value context (last element) + d[i[0]] = v[len(v)-1] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(dataTree).mount(i[1:], v) + return + } + + d[i[0]] = make(dataTree) + d[i[0]].(dataTree).mount(i[1:], v) +} + +// parse incoming dataTree request into JSON (including contentMultipart form dataTree) +func parseUploads(r *http.Request, cfg config.Uploads) *Uploads { + u := &Uploads{ + cfg: cfg, + tree: make(fileTree), + list: make([]*FileUpload, 0), + } + + for k, v := range r.MultipartForm.File { + files := make([]*FileUpload, 0, len(v)) + for _, f := range v { + files = append(files, NewUpload(f)) + } + + u.list = append(u.list, files...) + u.tree.push(k, files) + } + + return u +} + +// pushes new file upload into it's proper place. +func (d fileTree) push(k string, v []*FileUpload) { + keys := FetchIndexes(k) + if len(keys) <= MaxLevel { + d.mount(keys, v) + } +} + +// mount mounts data tree recursively. +func (d fileTree) mount(i []string, v []*FileUpload) { + if len(i) == 1 { + // single value context + d[i[0]] = v[0] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(fileTree).mount(i[1:], v) + return + } + + d[i[0]] = make(fileTree) + d[i[0]].(fileTree).mount(i[1:], v) +} + +// FetchIndexes parses input name and splits it into separate indexes list. +func FetchIndexes(s string) []string { + var ( + pos int + ch string + keys = make([]string, 1) + ) + + for _, c := range s { + ch = string(c) + switch ch { + case " ": + // ignore all spaces + continue + case "[": + pos = 1 + continue + case "]": + if pos == 1 { + keys = append(keys, "") + } + pos = 2 + default: + if pos == 1 || pos == 2 { + keys = append(keys, "") + } + + keys[len(keys)-1] += ch + pos = 0 + } + } + + return keys +} diff --git a/plugins/http/worker_handler/request.go b/plugins/http/worker_handler/request.go new file mode 100644 index 00000000..178bc827 --- /dev/null +++ b/plugins/http/worker_handler/request.go @@ -0,0 +1,187 @@ +package handler + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "strings" + + j "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/plugins/http/attributes" + "github.com/spiral/roadrunner/v2/plugins/http/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +var json = j.ConfigCompatibleWithStandardLibrary + +const ( + defaultMaxMemory = 32 << 20 // 32 MB + contentNone = iota + 900 + contentStream + contentMultipart + contentFormData +) + +// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. +type Request struct { + // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address. + RemoteAddr string `json:"remoteAddr"` + + // Protocol includes HTTP protocol version. + Protocol string `json:"protocol"` + + // Method contains name of HTTP method used for the request. + Method string `json:"method"` + + // URI contains full request URI with scheme and query. + URI string `json:"uri"` + + // Header contains list of request headers. + Header http.Header `json:"headers"` + + // Cookies contains list of request cookies. + Cookies map[string]string `json:"cookies"` + + // RawQuery contains non parsed query string (to be parsed on php end). + RawQuery string `json:"rawQuery"` + + // Parsed indicates that request body has been parsed on RR end. + Parsed bool `json:"parsed"` + + // Uploads contains list of uploaded files, their names, sized and associations with temporary files. + Uploads *Uploads `json:"uploads"` + + // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions. + Attributes map[string]interface{} `json:"attributes"` + + // request body can be parsedData or []byte + body interface{} +} + +func fetchIP(pair string) string { + if !strings.ContainsRune(pair, ':') { + return pair + } + + addr, _, _ := net.SplitHostPort(pair) + return addr +} + +// NewRequest creates new PSR7 compatible request using net/http request. +func NewRequest(r *http.Request, cfg config.Uploads) (*Request, error) { + req := &Request{ + RemoteAddr: fetchIP(r.RemoteAddr), + Protocol: r.Proto, + Method: r.Method, + URI: uri(r), + Header: r.Header, + Cookies: make(map[string]string), + RawQuery: r.URL.RawQuery, + Attributes: attributes.All(r), + } + + for _, c := range r.Cookies() { + if v, err := url.QueryUnescape(c.Value); err == nil { + req.Cookies[c.Name] = v + } + } + + switch req.contentType() { + case contentNone: + return req, nil + + case contentStream: + var err error + req.body, err = ioutil.ReadAll(r.Body) + return req, err + + case contentMultipart: + if err := r.ParseMultipartForm(defaultMaxMemory); err != nil { + return nil, err + } + + req.Uploads = parseUploads(r, cfg) + fallthrough + case contentFormData: + if err := r.ParseForm(); err != nil { + return nil, err + } + + req.body = parseData(r) + } + + req.Parsed = true + return req, nil +} + +// Open moves all uploaded files to temporary directory so it can be given to php later. +func (r *Request) Open(log logger.Logger) { + if r.Uploads == nil { + return + } + + r.Uploads.Open(log) +} + +// Close clears all temp file uploads +func (r *Request) Close(log logger.Logger) { + if r.Uploads == nil { + return + } + + r.Uploads.Clear(log) +} + +// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open +// files prior to calling this method. +func (r *Request) Payload() (payload.Payload, error) { + p := payload.Payload{} + + var err error + if p.Context, err = json.Marshal(r); err != nil { + return payload.Payload{}, err + } + + if r.Parsed { + if p.Body, err = json.Marshal(r.body); err != nil { + return payload.Payload{}, err + } + } else if r.body != nil { + p.Body = r.body.([]byte) + } + + return p, nil +} + +// contentType returns the payload content type. +func (r *Request) contentType() int { + if r.Method == "HEAD" || r.Method == "OPTIONS" { + return contentNone + } + + ct := r.Header.Get("content-type") + if strings.Contains(ct, "application/x-www-form-urlencoded") { + return contentFormData + } + + if strings.Contains(ct, "multipart/form-data") { + return contentMultipart + } + + return contentStream +} + +// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). +func uri(r *http.Request) string { + if r.URL.Host != "" { + return r.URL.String() + } + if r.TLS != nil { + return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) + } + + return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) +} diff --git a/plugins/http/worker_handler/response.go b/plugins/http/worker_handler/response.go new file mode 100644 index 00000000..1763d304 --- /dev/null +++ b/plugins/http/worker_handler/response.go @@ -0,0 +1,105 @@ +package handler + +import ( + "io" + "net/http" + "strings" + "sync" + + "github.com/spiral/roadrunner/v2/pkg/payload" +) + +// Response handles PSR7 response logic. +type Response struct { + // Status contains response status. + Status int `json:"status"` + + // Header contains list of response headers. + Headers map[string][]string `json:"headers"` + + // associated Body payload. + Body interface{} + sync.Mutex +} + +// NewResponse creates new response based on given pool payload. +func NewResponse(p payload.Payload) (*Response, error) { + r := &Response{Body: p.Body} + if err := json.Unmarshal(p.Context, r); err != nil { + return nil, err + } + + return r, nil +} + +// Write writes response headers, status and body into ResponseWriter. +func (r *Response) Write(w http.ResponseWriter) error { + // INFO map is the reference type in golang + p := handlePushHeaders(r.Headers) + if pusher, ok := w.(http.Pusher); ok { + for _, v := range p { + err := pusher.Push(v, nil) + if err != nil { + return err + } + } + } + + handleTrailers(r.Headers) + for n, h := range r.Headers { + for _, v := range h { + w.Header().Add(n, v) + } + } + + w.WriteHeader(r.Status) + + if data, ok := r.Body.([]byte); ok { + _, err := w.Write(data) + if err != nil { + return handleWriteError(err) + } + } + + if rc, ok := r.Body.(io.Reader); ok { + if _, err := io.Copy(w, rc); err != nil { + return err + } + } + + return nil +} + +func handlePushHeaders(h map[string][]string) []string { + var p []string + pushHeader, ok := h[http2pushHeaderKey] + if !ok { + return p + } + + p = append(p, pushHeader...) + + delete(h, http2pushHeaderKey) + + return p +} + +func handleTrailers(h map[string][]string) { + trailers, ok := h[TrailerHeaderKey] + if !ok { + return + } + + for _, tr := range trailers { + for _, n := range strings.Split(tr, ",") { + n = strings.Trim(n, "\t ") + if v, ok := h[n]; ok { + h["Trailer:"+n] = v + + delete(h, n) + } + } + } + + delete(h, TrailerHeaderKey) +} diff --git a/plugins/http/worker_handler/uploads.go b/plugins/http/worker_handler/uploads.go new file mode 100644 index 00000000..e695000e --- /dev/null +++ b/plugins/http/worker_handler/uploads.go @@ -0,0 +1,159 @@ +package handler + +import ( + "github.com/spiral/roadrunner/v2/plugins/http/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + + "io" + "io/ioutil" + "mime/multipart" + "os" + "sync" +) + +const ( + // UploadErrorOK - no error, the file uploaded with success. + UploadErrorOK = 0 + + // UploadErrorNoFile - no file was uploaded. + UploadErrorNoFile = 4 + + // UploadErrorNoTmpDir - missing a temporary folder. + UploadErrorNoTmpDir = 6 + + // UploadErrorCantWrite - failed to write file to disk. + UploadErrorCantWrite = 7 + + // UploadErrorExtension - forbidden file extension. + UploadErrorExtension = 8 +) + +// Uploads tree manages uploaded files tree and temporary files. +type Uploads struct { + // associated temp directory and forbidden extensions. + cfg config.Uploads + + // pre processed data tree for Uploads. + tree fileTree + + // flat list of all file Uploads. + list []*FileUpload +} + +// MarshalJSON marshal tree tree into JSON. +func (u *Uploads) MarshalJSON() ([]byte, error) { + return json.Marshal(u.tree) +} + +// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors +// will be handled individually. +func (u *Uploads) Open(log logger.Logger) { + var wg sync.WaitGroup + for _, f := range u.list { + wg.Add(1) + go func(f *FileUpload) { + defer wg.Done() + err := f.Open(u.cfg) + if err != nil && log != nil { + log.Error("error opening the file", "err", err) + } + }(f) + } + + wg.Wait() +} + +// Clear deletes all temporary files. +func (u *Uploads) Clear(log logger.Logger) { + for _, f := range u.list { + if f.TempFilename != "" && exists(f.TempFilename) { + err := os.Remove(f.TempFilename) + if err != nil && log != nil { + log.Error("error removing the file", "err", err) + } + } + } +} + +// FileUpload represents singular file NewUpload. +type FileUpload struct { + // ID contains filename specified by the client. + Name string `json:"name"` + + // Mime contains mime-type provided by the client. + Mime string `json:"mime"` + + // Size of the uploaded file. + Size int64 `json:"size"` + + // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php + Error int `json:"error"` + + // TempFilename points to temporary file location. + TempFilename string `json:"tmpName"` + + // associated file header + header *multipart.FileHeader +} + +// NewUpload wraps net/http upload into PRS-7 compatible structure. +func NewUpload(f *multipart.FileHeader) *FileUpload { + return &FileUpload{ + Name: f.Filename, + Mime: f.Header.Get("Content-Type"), + Error: UploadErrorOK, + header: f, + } +} + +// Open moves file content into temporary file available for PHP. +// NOTE: +// There is 2 deferred functions, and in case of getting 2 errors from both functions +// error from close of temp file would be overwritten by error from the main file +// STACK +// DEFER FILE CLOSE (2) +// DEFER TMP CLOSE (1) +func (f *FileUpload) Open(cfg config.Uploads) (err error) { + if cfg.Forbids(f.Name) { + f.Error = UploadErrorExtension + return nil + } + + file, err := f.header.Open() + if err != nil { + f.Error = UploadErrorNoFile + return err + } + + defer func() { + // close the main file + err = file.Close() + }() + + tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload") + if err != nil { + // most likely cause of this issue is missing tmp dir + f.Error = UploadErrorNoTmpDir + return err + } + + f.TempFilename = tmp.Name() + defer func() { + // close the temp file + err = tmp.Close() + }() + + if f.Size, err = io.Copy(tmp, file); err != nil { + f.Error = UploadErrorCantWrite + } + + return err +} + +// exists if file exists. +func exists(path string) bool { + if _, err := os.Stat(path); os.IsNotExist(err) { + return false + } + return true +} |