summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-03 22:52:30 +0300
committerValery Piashchynski <[email protected]>2021-05-03 22:52:30 +0300
commit9ee78f937d5be67058882dd3590f89da35bca239 (patch)
tree17cda27feabf5f2b8afc6a2796117835045afd36 /pkg
parent009b7009885d8a15e6fa6c7e78436087b2f20129 (diff)
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/payload/payload.go5
-rwxr-xr-xpkg/pool/static_pool.go5
-rw-r--r--pkg/worker_handler/constants.go8
-rw-r--r--pkg/worker_handler/errors.go25
-rw-r--r--pkg/worker_handler/errors_windows.go27
-rw-r--r--pkg/worker_handler/handler.go217
-rw-r--r--pkg/worker_handler/parse.go149
-rw-r--r--pkg/worker_handler/request.go187
-rw-r--r--pkg/worker_handler/response.go105
-rw-r--r--pkg/worker_handler/uploads.go159
10 files changed, 877 insertions, 10 deletions
diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go
index bf3972aa..1fe334eb 100755
--- a/pkg/payload/payload.go
+++ b/pkg/payload/payload.go
@@ -16,8 +16,3 @@ type Payload struct {
func (p *Payload) String() string {
return toString(p.Body)
}
-
-// unsafe, but lightning fast []byte to string conversion
-func toString(data []byte) string {
- return *(*string)(unsafe.Pointer(&data))
-}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 06005d98..e769093c 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -327,8 +327,3 @@ func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess,
}
return workers, nil
}
-
-// unsafe, but lightning fast []byte to string conversion
-func toString(data []byte) string {
- return *(*string)(unsafe.Pointer(&data))
-}
diff --git a/pkg/worker_handler/constants.go b/pkg/worker_handler/constants.go
new file mode 100644
index 00000000..3355d9c2
--- /dev/null
+++ b/pkg/worker_handler/constants.go
@@ -0,0 +1,8 @@
+package handler
+
+import "net/http"
+
+var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push")
+
+// TrailerHeaderKey http header key
+var TrailerHeaderKey = http.CanonicalHeaderKey("trailer")
diff --git a/pkg/worker_handler/errors.go b/pkg/worker_handler/errors.go
new file mode 100644
index 00000000..5fa8e64e
--- /dev/null
+++ b/pkg/worker_handler/errors.go
@@ -0,0 +1,25 @@
+// +build !windows
+
+package handler
+
+import (
+ "errors"
+ "net"
+ "os"
+ "syscall"
+)
+
+// Broken pipe
+var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer")
+
+// handleWriteError just check if error was caused by aborted connection on linux
+func handleWriteError(err error) error {
+ if netErr, ok2 := err.(*net.OpError); ok2 {
+ if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
+ if errors.Is(syscallErr.Err, syscall.EPIPE) {
+ return errEPIPE
+ }
+ }
+ }
+ return err
+}
diff --git a/pkg/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go
new file mode 100644
index 00000000..390cc7d1
--- /dev/null
+++ b/pkg/worker_handler/errors_windows.go
@@ -0,0 +1,27 @@
+// +build windows
+
+package handler
+
+import (
+ "errors"
+ "net"
+ "os"
+ "syscall"
+)
+
+//Software caused connection abort.
+//An established connection was aborted by the software in your host computer,
+//possibly due to a data transmission time-out or protocol error.
+var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer")
+
+// handleWriteError just check if error was caused by aborted connection on windows
+func handleWriteError(err error) error {
+ if netErr, ok2 := err.(*net.OpError); ok2 {
+ if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
+ if syscallErr.Err == syscall.WSAECONNABORTED {
+ return errEPIPE
+ }
+ }
+ }
+ return err
+}
diff --git a/pkg/worker_handler/handler.go b/pkg/worker_handler/handler.go
new file mode 100644
index 00000000..be53fc12
--- /dev/null
+++ b/pkg/worker_handler/handler.go
@@ -0,0 +1,217 @@
+package handler
+
+import (
+ "net"
+ "net/http"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/http/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// MB is 1024 bytes
+const MB uint64 = 1024 * 1024
+
+// ErrorEvent represents singular http error event.
+type ErrorEvent struct {
+ // Request contains client request, must not be stored.
+ Request *http.Request
+
+ // Error - associated error, if any.
+ Error error
+
+ // event timings
+ start time.Time
+ elapsed time.Duration
+}
+
+// Elapsed returns duration of the invocation.
+func (e *ErrorEvent) Elapsed() time.Duration {
+ return e.elapsed
+}
+
+// ResponseEvent represents singular http response event.
+type ResponseEvent struct {
+ // Request contains client request, must not be stored.
+ Request *Request
+
+ // Response contains service response.
+ Response *Response
+
+ // event timings
+ start time.Time
+ elapsed time.Duration
+}
+
+// Elapsed returns duration of the invocation.
+func (e *ResponseEvent) Elapsed() time.Duration {
+ return e.elapsed
+}
+
+// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
+// parsed files and query, payload will include parsed form dataTree (if any).
+type Handler struct {
+ maxRequestSize uint64
+ uploads config.Uploads
+ trusted config.Cidrs
+ log logger.Logger
+ pool pool.Pool
+ mul sync.Mutex
+ lsn events.Listener
+}
+
+// NewHandler return handle interface implementation
+func NewHandler(maxReqSize uint64, uploads config.Uploads, trusted config.Cidrs, pool pool.Pool) (*Handler, error) {
+ if pool == nil {
+ return nil, errors.E(errors.Str("pool should be initialized"))
+ }
+ return &Handler{
+ maxRequestSize: maxReqSize * MB,
+ uploads: uploads,
+ pool: pool,
+ trusted: trusted,
+ }, nil
+}
+
+// AddListener attaches handler event controller.
+func (h *Handler) AddListener(l events.Listener) {
+ h.mul.Lock()
+ defer h.mul.Unlock()
+
+ h.lsn = l
+}
+
+// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
+func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ const op = errors.Op("http_plugin_serve_http")
+ start := time.Now()
+
+ // validating request size
+ if h.maxRequestSize != 0 {
+ const op = errors.Op("http_handler_max_size")
+ if length := r.Header.Get("content-length"); length != "" {
+ // try to parse the value from the `content-length` header
+ size, err := strconv.ParseInt(length, 10, 64)
+ if err != nil {
+ // if got an error while parsing -> assign 500 code to the writer and return
+ http.Error(w, errors.E(op, err).Error(), 500)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("error while parsing value from the `content-length` header")), start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ if size > int64(h.maxRequestSize) {
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, errors.Str("request body max size is exceeded")), start: start, elapsed: time.Since(start)})
+ http.Error(w, errors.E(op, errors.Str("request body max size is exceeded")).Error(), 500)
+ return
+ }
+ }
+ }
+
+ req, err := NewRequest(r, h.uploads)
+ if err != nil {
+ // if pipe is broken, there is no sense to write the header
+ // in this case we just report about error
+ if err == errEPIPE {
+ h.sendEvent(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ http.Error(w, errors.E(op, err).Error(), 500)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ // proxy IP resolution
+ h.resolveIP(req)
+
+ req.Open(h.log)
+ defer req.Close(h.log)
+
+ p, err := req.Payload()
+ if err != nil {
+ http.Error(w, errors.E(op, err).Error(), 500)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ rsp, err := h.pool.Exec(p)
+ if err != nil {
+ http.Error(w, errors.E(op, err).Error(), 500)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ resp, err := NewResponse(rsp)
+ if err != nil {
+ http.Error(w, errors.E(op, err).Error(), resp.Status)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
+ return
+ }
+
+ h.handleResponse(req, resp, start)
+ err = resp.Write(w)
+ if err != nil {
+ http.Error(w, errors.E(op, err).Error(), 500)
+ h.sendEvent(ErrorEvent{Request: r, Error: errors.E(op, err), start: start, elapsed: time.Since(start)})
+ }
+}
+
+// handleResponse triggers response event.
+func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) {
+ h.sendEvent(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)})
+}
+
+// sendEvent invokes event handler if any.
+func (h *Handler) sendEvent(event interface{}) {
+ if h.lsn != nil {
+ h.lsn(event)
+ }
+}
+
+// get real ip passing multiple proxy
+func (h *Handler) resolveIP(r *Request) {
+ if h.trusted.IsTrusted(r.RemoteAddr) == false { //nolint:gosimple
+ return
+ }
+
+ if r.Header.Get("X-Forwarded-For") != "" {
+ ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",")
+ ipCount := len(ips)
+
+ for i := ipCount - 1; i >= 0; i-- {
+ addr := strings.TrimSpace(ips[i])
+ if net.ParseIP(addr) != nil {
+ r.RemoteAddr = addr
+ return
+ }
+ }
+
+ return
+ }
+
+ // The logic here is the following:
+ // In general case, we only expect X-Real-Ip header. If it exist, we get the IP address from header and set request Remote address
+ // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers
+ // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF.
+ // CF-Connecting-IP is an Enterprise feature and we check it last in order.
+ // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string
+ if r.Header.Get("X-Real-Ip") != "" {
+ r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip"))
+ return
+ }
+
+ if r.Header.Get("True-Client-IP") != "" {
+ r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP"))
+ return
+ }
+
+ if r.Header.Get("CF-Connecting-IP") != "" {
+ r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP"))
+ }
+}
diff --git a/pkg/worker_handler/parse.go b/pkg/worker_handler/parse.go
new file mode 100644
index 00000000..2790da2a
--- /dev/null
+++ b/pkg/worker_handler/parse.go
@@ -0,0 +1,149 @@
+package handler
+
+import (
+ "net/http"
+
+ "github.com/spiral/roadrunner/v2/plugins/http/config"
+)
+
+// MaxLevel defines maximum tree depth for incoming request data and files.
+const MaxLevel = 127
+
+type dataTree map[string]interface{}
+type fileTree map[string]interface{}
+
+// parseData parses incoming request body into data tree.
+func parseData(r *http.Request) dataTree {
+ data := make(dataTree)
+ if r.PostForm != nil {
+ for k, v := range r.PostForm {
+ data.push(k, v)
+ }
+ }
+
+ if r.MultipartForm != nil {
+ for k, v := range r.MultipartForm.Value {
+ data.push(k, v)
+ }
+ }
+
+ return data
+}
+
+// pushes value into data tree.
+func (d dataTree) push(k string, v []string) {
+ keys := FetchIndexes(k)
+ if len(keys) <= MaxLevel {
+ d.mount(keys, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d dataTree) mount(i []string, v []string) {
+ if len(i) == 1 {
+ // single value context (last element)
+ d[i[0]] = v[len(v)-1]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(dataTree).mount(i[1:], v)
+ return
+ }
+
+ d[i[0]] = make(dataTree)
+ d[i[0]].(dataTree).mount(i[1:], v)
+}
+
+// parse incoming dataTree request into JSON (including contentMultipart form dataTree)
+func parseUploads(r *http.Request, cfg config.Uploads) *Uploads {
+ u := &Uploads{
+ cfg: cfg,
+ tree: make(fileTree),
+ list: make([]*FileUpload, 0),
+ }
+
+ for k, v := range r.MultipartForm.File {
+ files := make([]*FileUpload, 0, len(v))
+ for _, f := range v {
+ files = append(files, NewUpload(f))
+ }
+
+ u.list = append(u.list, files...)
+ u.tree.push(k, files)
+ }
+
+ return u
+}
+
+// pushes new file upload into it's proper place.
+func (d fileTree) push(k string, v []*FileUpload) {
+ keys := FetchIndexes(k)
+ if len(keys) <= MaxLevel {
+ d.mount(keys, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d fileTree) mount(i []string, v []*FileUpload) {
+ if len(i) == 1 {
+ // single value context
+ d[i[0]] = v[0]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(fileTree).mount(i[1:], v)
+ return
+ }
+
+ d[i[0]] = make(fileTree)
+ d[i[0]].(fileTree).mount(i[1:], v)
+}
+
+// FetchIndexes parses input name and splits it into separate indexes list.
+func FetchIndexes(s string) []string {
+ var (
+ pos int
+ ch string
+ keys = make([]string, 1)
+ )
+
+ for _, c := range s {
+ ch = string(c)
+ switch ch {
+ case " ":
+ // ignore all spaces
+ continue
+ case "[":
+ pos = 1
+ continue
+ case "]":
+ if pos == 1 {
+ keys = append(keys, "")
+ }
+ pos = 2
+ default:
+ if pos == 1 || pos == 2 {
+ keys = append(keys, "")
+ }
+
+ keys[len(keys)-1] += ch
+ pos = 0
+ }
+ }
+
+ return keys
+}
diff --git a/pkg/worker_handler/request.go b/pkg/worker_handler/request.go
new file mode 100644
index 00000000..178bc827
--- /dev/null
+++ b/pkg/worker_handler/request.go
@@ -0,0 +1,187 @@
+package handler
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "net/url"
+ "strings"
+
+ j "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/plugins/http/attributes"
+ "github.com/spiral/roadrunner/v2/plugins/http/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+var json = j.ConfigCompatibleWithStandardLibrary
+
+const (
+ defaultMaxMemory = 32 << 20 // 32 MB
+ contentNone = iota + 900
+ contentStream
+ contentMultipart
+ contentFormData
+)
+
+// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files.
+type Request struct {
+ // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address.
+ RemoteAddr string `json:"remoteAddr"`
+
+ // Protocol includes HTTP protocol version.
+ Protocol string `json:"protocol"`
+
+ // Method contains name of HTTP method used for the request.
+ Method string `json:"method"`
+
+ // URI contains full request URI with scheme and query.
+ URI string `json:"uri"`
+
+ // Header contains list of request headers.
+ Header http.Header `json:"headers"`
+
+ // Cookies contains list of request cookies.
+ Cookies map[string]string `json:"cookies"`
+
+ // RawQuery contains non parsed query string (to be parsed on php end).
+ RawQuery string `json:"rawQuery"`
+
+ // Parsed indicates that request body has been parsed on RR end.
+ Parsed bool `json:"parsed"`
+
+ // Uploads contains list of uploaded files, their names, sized and associations with temporary files.
+ Uploads *Uploads `json:"uploads"`
+
+ // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions.
+ Attributes map[string]interface{} `json:"attributes"`
+
+ // request body can be parsedData or []byte
+ body interface{}
+}
+
+func fetchIP(pair string) string {
+ if !strings.ContainsRune(pair, ':') {
+ return pair
+ }
+
+ addr, _, _ := net.SplitHostPort(pair)
+ return addr
+}
+
+// NewRequest creates new PSR7 compatible request using net/http request.
+func NewRequest(r *http.Request, cfg config.Uploads) (*Request, error) {
+ req := &Request{
+ RemoteAddr: fetchIP(r.RemoteAddr),
+ Protocol: r.Proto,
+ Method: r.Method,
+ URI: uri(r),
+ Header: r.Header,
+ Cookies: make(map[string]string),
+ RawQuery: r.URL.RawQuery,
+ Attributes: attributes.All(r),
+ }
+
+ for _, c := range r.Cookies() {
+ if v, err := url.QueryUnescape(c.Value); err == nil {
+ req.Cookies[c.Name] = v
+ }
+ }
+
+ switch req.contentType() {
+ case contentNone:
+ return req, nil
+
+ case contentStream:
+ var err error
+ req.body, err = ioutil.ReadAll(r.Body)
+ return req, err
+
+ case contentMultipart:
+ if err := r.ParseMultipartForm(defaultMaxMemory); err != nil {
+ return nil, err
+ }
+
+ req.Uploads = parseUploads(r, cfg)
+ fallthrough
+ case contentFormData:
+ if err := r.ParseForm(); err != nil {
+ return nil, err
+ }
+
+ req.body = parseData(r)
+ }
+
+ req.Parsed = true
+ return req, nil
+}
+
+// Open moves all uploaded files to temporary directory so it can be given to php later.
+func (r *Request) Open(log logger.Logger) {
+ if r.Uploads == nil {
+ return
+ }
+
+ r.Uploads.Open(log)
+}
+
+// Close clears all temp file uploads
+func (r *Request) Close(log logger.Logger) {
+ if r.Uploads == nil {
+ return
+ }
+
+ r.Uploads.Clear(log)
+}
+
+// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
+// files prior to calling this method.
+func (r *Request) Payload() (payload.Payload, error) {
+ p := payload.Payload{}
+
+ var err error
+ if p.Context, err = json.Marshal(r); err != nil {
+ return payload.Payload{}, err
+ }
+
+ if r.Parsed {
+ if p.Body, err = json.Marshal(r.body); err != nil {
+ return payload.Payload{}, err
+ }
+ } else if r.body != nil {
+ p.Body = r.body.([]byte)
+ }
+
+ return p, nil
+}
+
+// contentType returns the payload content type.
+func (r *Request) contentType() int {
+ if r.Method == "HEAD" || r.Method == "OPTIONS" {
+ return contentNone
+ }
+
+ ct := r.Header.Get("content-type")
+ if strings.Contains(ct, "application/x-www-form-urlencoded") {
+ return contentFormData
+ }
+
+ if strings.Contains(ct, "multipart/form-data") {
+ return contentMultipart
+ }
+
+ return contentStream
+}
+
+// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
+func uri(r *http.Request) string {
+ if r.URL.Host != "" {
+ return r.URL.String()
+ }
+ if r.TLS != nil {
+ return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
+ }
+
+ return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
+}
diff --git a/pkg/worker_handler/response.go b/pkg/worker_handler/response.go
new file mode 100644
index 00000000..1763d304
--- /dev/null
+++ b/pkg/worker_handler/response.go
@@ -0,0 +1,105 @@
+package handler
+
+import (
+ "io"
+ "net/http"
+ "strings"
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+)
+
+// Response handles PSR7 response logic.
+type Response struct {
+ // Status contains response status.
+ Status int `json:"status"`
+
+ // Header contains list of response headers.
+ Headers map[string][]string `json:"headers"`
+
+ // associated Body payload.
+ Body interface{}
+ sync.Mutex
+}
+
+// NewResponse creates new response based on given pool payload.
+func NewResponse(p payload.Payload) (*Response, error) {
+ r := &Response{Body: p.Body}
+ if err := json.Unmarshal(p.Context, r); err != nil {
+ return nil, err
+ }
+
+ return r, nil
+}
+
+// Write writes response headers, status and body into ResponseWriter.
+func (r *Response) Write(w http.ResponseWriter) error {
+ // INFO map is the reference type in golang
+ p := handlePushHeaders(r.Headers)
+ if pusher, ok := w.(http.Pusher); ok {
+ for _, v := range p {
+ err := pusher.Push(v, nil)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ handleTrailers(r.Headers)
+ for n, h := range r.Headers {
+ for _, v := range h {
+ w.Header().Add(n, v)
+ }
+ }
+
+ w.WriteHeader(r.Status)
+
+ if data, ok := r.Body.([]byte); ok {
+ _, err := w.Write(data)
+ if err != nil {
+ return handleWriteError(err)
+ }
+ }
+
+ if rc, ok := r.Body.(io.Reader); ok {
+ if _, err := io.Copy(w, rc); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func handlePushHeaders(h map[string][]string) []string {
+ var p []string
+ pushHeader, ok := h[http2pushHeaderKey]
+ if !ok {
+ return p
+ }
+
+ p = append(p, pushHeader...)
+
+ delete(h, http2pushHeaderKey)
+
+ return p
+}
+
+func handleTrailers(h map[string][]string) {
+ trailers, ok := h[TrailerHeaderKey]
+ if !ok {
+ return
+ }
+
+ for _, tr := range trailers {
+ for _, n := range strings.Split(tr, ",") {
+ n = strings.Trim(n, "\t ")
+ if v, ok := h[n]; ok {
+ h["Trailer:"+n] = v
+
+ delete(h, n)
+ }
+ }
+ }
+
+ delete(h, TrailerHeaderKey)
+}
diff --git a/pkg/worker_handler/uploads.go b/pkg/worker_handler/uploads.go
new file mode 100644
index 00000000..e695000e
--- /dev/null
+++ b/pkg/worker_handler/uploads.go
@@ -0,0 +1,159 @@
+package handler
+
+import (
+ "github.com/spiral/roadrunner/v2/plugins/http/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+
+ "io"
+ "io/ioutil"
+ "mime/multipart"
+ "os"
+ "sync"
+)
+
+const (
+ // UploadErrorOK - no error, the file uploaded with success.
+ UploadErrorOK = 0
+
+ // UploadErrorNoFile - no file was uploaded.
+ UploadErrorNoFile = 4
+
+ // UploadErrorNoTmpDir - missing a temporary folder.
+ UploadErrorNoTmpDir = 6
+
+ // UploadErrorCantWrite - failed to write file to disk.
+ UploadErrorCantWrite = 7
+
+ // UploadErrorExtension - forbidden file extension.
+ UploadErrorExtension = 8
+)
+
+// Uploads tree manages uploaded files tree and temporary files.
+type Uploads struct {
+ // associated temp directory and forbidden extensions.
+ cfg config.Uploads
+
+ // pre processed data tree for Uploads.
+ tree fileTree
+
+ // flat list of all file Uploads.
+ list []*FileUpload
+}
+
+// MarshalJSON marshal tree tree into JSON.
+func (u *Uploads) MarshalJSON() ([]byte, error) {
+ return json.Marshal(u.tree)
+}
+
+// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
+// will be handled individually.
+func (u *Uploads) Open(log logger.Logger) {
+ var wg sync.WaitGroup
+ for _, f := range u.list {
+ wg.Add(1)
+ go func(f *FileUpload) {
+ defer wg.Done()
+ err := f.Open(u.cfg)
+ if err != nil && log != nil {
+ log.Error("error opening the file", "err", err)
+ }
+ }(f)
+ }
+
+ wg.Wait()
+}
+
+// Clear deletes all temporary files.
+func (u *Uploads) Clear(log logger.Logger) {
+ for _, f := range u.list {
+ if f.TempFilename != "" && exists(f.TempFilename) {
+ err := os.Remove(f.TempFilename)
+ if err != nil && log != nil {
+ log.Error("error removing the file", "err", err)
+ }
+ }
+ }
+}
+
+// FileUpload represents singular file NewUpload.
+type FileUpload struct {
+ // ID contains filename specified by the client.
+ Name string `json:"name"`
+
+ // Mime contains mime-type provided by the client.
+ Mime string `json:"mime"`
+
+ // Size of the uploaded file.
+ Size int64 `json:"size"`
+
+ // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php
+ Error int `json:"error"`
+
+ // TempFilename points to temporary file location.
+ TempFilename string `json:"tmpName"`
+
+ // associated file header
+ header *multipart.FileHeader
+}
+
+// NewUpload wraps net/http upload into PRS-7 compatible structure.
+func NewUpload(f *multipart.FileHeader) *FileUpload {
+ return &FileUpload{
+ Name: f.Filename,
+ Mime: f.Header.Get("Content-Type"),
+ Error: UploadErrorOK,
+ header: f,
+ }
+}
+
+// Open moves file content into temporary file available for PHP.
+// NOTE:
+// There is 2 deferred functions, and in case of getting 2 errors from both functions
+// error from close of temp file would be overwritten by error from the main file
+// STACK
+// DEFER FILE CLOSE (2)
+// DEFER TMP CLOSE (1)
+func (f *FileUpload) Open(cfg config.Uploads) (err error) {
+ if cfg.Forbids(f.Name) {
+ f.Error = UploadErrorExtension
+ return nil
+ }
+
+ file, err := f.header.Open()
+ if err != nil {
+ f.Error = UploadErrorNoFile
+ return err
+ }
+
+ defer func() {
+ // close the main file
+ err = file.Close()
+ }()
+
+ tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload")
+ if err != nil {
+ // most likely cause of this issue is missing tmp dir
+ f.Error = UploadErrorNoTmpDir
+ return err
+ }
+
+ f.TempFilename = tmp.Name()
+ defer func() {
+ // close the temp file
+ err = tmp.Close()
+ }()
+
+ if f.Size, err = io.Copy(tmp, file); err != nil {
+ f.Error = UploadErrorCantWrite
+ }
+
+ return err
+}
+
+// exists if file exists.
+func exists(path string) bool {
+ if _, err := os.Stat(path); os.IsNotExist(err) {
+ return false
+ }
+ return true
+}