summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-25 14:46:01 +0300
committerValery Piashchynski <[email protected]>2020-12-25 14:46:01 +0300
commit8526c03822e724bc2ebb64b6197085fea335b782 (patch)
treeb205b392b3721606fae4fa3174327259b41bc76a /plugins
parent42b33b77793789d666451798b07587f6404242b4 (diff)
Move root plugins to the pkg
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/attributes/attributes.go85
-rw-r--r--plugins/http/config.go291
-rw-r--r--plugins/http/constants.go6
-rw-r--r--plugins/http/errors.go25
-rw-r--r--plugins/http/errors_windows.go27
-rw-r--r--plugins/http/handler.go240
-rw-r--r--plugins/http/parse.go147
-rw-r--r--plugins/http/plugin.go528
-rw-r--r--plugins/http/request.go186
-rw-r--r--plugins/http/response.go105
-rw-r--r--plugins/http/uploads.go158
-rw-r--r--plugins/http/uploads_config.go46
-rw-r--r--plugins/informer/interface.go8
-rw-r--r--plugins/informer/plugin.go55
-rw-r--r--plugins/informer/rpc.go54
-rw-r--r--plugins/server/config.go39
-rw-r--r--plugins/server/interface.go20
-rw-r--r--plugins/server/plugin.go229
18 files changed, 0 insertions, 2249 deletions
diff --git a/plugins/http/attributes/attributes.go b/plugins/http/attributes/attributes.go
deleted file mode 100644
index 4c453766..00000000
--- a/plugins/http/attributes/attributes.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package attributes
-
-import (
- "context"
- "errors"
- "net/http"
-)
-
-// contextKey is a value for use with context.WithValue. It's used as
-// a pointer so it fits in an interface{} without allocation.
-type contextKey struct {
- name string
-}
-
-func (k *contextKey) String() string { return k.name }
-
-var (
- // PsrContextKey is a context key. It can be used in the http attributes
- PsrContextKey = &contextKey{"psr_attributes"}
-)
-
-type attrs map[string]interface{}
-
-func (v attrs) get(key string) interface{} {
- if v == nil {
- return ""
- }
-
- return v[key]
-}
-
-func (v attrs) set(key string, value interface{}) {
- v[key] = value
-}
-
-func (v attrs) del(key string) {
- delete(v, key)
-}
-
-// Init returns request with new context and attribute bag.
-func Init(r *http.Request) *http.Request {
- return r.WithContext(context.WithValue(r.Context(), PsrContextKey, attrs{}))
-}
-
-// All returns all context attributes.
-func All(r *http.Request) map[string]interface{} {
- v := r.Context().Value(PsrContextKey)
- if v == nil {
- return attrs{}
- }
-
- return v.(attrs)
-}
-
-// Get gets the value from request context. It replaces any existing
-// values.
-func Get(r *http.Request, key string) interface{} {
- v := r.Context().Value(PsrContextKey)
- if v == nil {
- return nil
- }
-
- return v.(attrs).get(key)
-}
-
-// Set sets the key to value. It replaces any existing
-// values. Context specific.
-func Set(r *http.Request, key string, value interface{}) error {
- v := r.Context().Value(PsrContextKey)
- if v == nil {
- return errors.New("unable to find `psr:attributes` context key")
- }
-
- v.(attrs).set(key, value)
- return nil
-}
-
-// Delete deletes values associated with attribute key.
-func (v attrs) Delete(key string) {
- if v == nil {
- return
- }
-
- v.del(key)
-}
diff --git a/plugins/http/config.go b/plugins/http/config.go
deleted file mode 100644
index 00d2940b..00000000
--- a/plugins/http/config.go
+++ /dev/null
@@ -1,291 +0,0 @@
-package http
-
-import (
- "net"
- "os"
- "runtime"
- "strings"
- "time"
-
- "github.com/spiral/errors"
- poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
-)
-
-type Cidrs []*net.IPNet
-
-func (c *Cidrs) IsTrusted(ip string) bool {
- if len(*c) == 0 {
- return false
- }
-
- i := net.ParseIP(ip)
- if i == nil {
- return false
- }
-
- for _, cird := range *c {
- if cird.Contains(i) {
- return true
- }
- }
-
- return false
-}
-
-// Config configures RoadRunner HTTP server.
-type Config struct {
- // Port and port to handle as http server.
- Address string
-
- // SSL defines https server options.
- SSL *SSLConfig
-
- // FCGI configuration. You can use FastCGI without HTTP server.
- FCGI *FCGIConfig
-
- // HTTP2 configuration
- HTTP2 *HTTP2Config
-
- // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited.
- MaxRequestSize uint64
-
- // TrustedSubnets declare IP subnets which are allowed to set ip using X-Real-Ip and X-Forwarded-For
- TrustedSubnets []string
-
- // Uploads configures uploads configuration.
- Uploads *UploadsConfig
-
- // Pool configures worker pool.
- Pool *poolImpl.Config
-
- // Env is environment variables passed to the http pool
- Env map[string]string
-
- // List of the middleware names (order will be preserved)
- Middleware []string
-
- // slice of net.IPNet
- cidrs Cidrs
-}
-
-// FCGIConfig for FastCGI server.
-type FCGIConfig struct {
- // Address and port to handle as http server.
- Address string
-}
-
-// HTTP2Config HTTP/2 server customizations.
-type HTTP2Config struct {
- // Enable or disable HTTP/2 extension, default enable.
- Enabled bool
-
- // H2C enables HTTP/2 over TCP
- H2C bool
-
- // MaxConcurrentStreams defaults to 128.
- MaxConcurrentStreams uint32
-}
-
-// InitDefaults sets default values for HTTP/2 configuration.
-func (cfg *HTTP2Config) InitDefaults() error {
- cfg.Enabled = true
- cfg.MaxConcurrentStreams = 128
-
- return nil
-}
-
-// SSLConfig defines https server configuration.
-type SSLConfig struct {
- // Port to listen as HTTPS server, defaults to 443.
- Port int
-
- // Redirect when enabled forces all http connections to switch to https.
- Redirect bool
-
- // Key defined private server key.
- Key string
-
- // Cert is https certificate.
- Cert string
-
- // Root CA file
- RootCA string
-}
-
-// EnableHTTP is true when http server must run.
-func (c *Config) EnableHTTP() bool {
- return c.Address != ""
-}
-
-// EnableTLS returns true if pool must listen TLS connections.
-func (c *Config) EnableTLS() bool {
- return c.SSL.Key != "" || c.SSL.Cert != "" || c.SSL.RootCA != ""
-}
-
-// EnableHTTP2 when HTTP/2 extension must be enabled (only with TSL).
-func (c *Config) EnableHTTP2() bool {
- return c.HTTP2.Enabled
-}
-
-// EnableH2C when HTTP/2 extension must be enabled on TCP.
-func (c *Config) EnableH2C() bool {
- return c.HTTP2.H2C
-}
-
-// EnableFCGI is true when FastCGI server must be enabled.
-func (c *Config) EnableFCGI() bool {
- return c.FCGI.Address != ""
-}
-
-// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
-func (c *Config) InitDefaults() error {
- if c.Pool == nil {
- // default pool
- c.Pool = &poolImpl.Config{
- Debug: false,
- NumWorkers: int64(runtime.NumCPU()),
- MaxJobs: 1000,
- AllocateTimeout: time.Second * 60,
- DestroyTimeout: time.Second * 60,
- Supervisor: nil,
- }
- }
-
- if c.HTTP2 == nil {
- c.HTTP2 = &HTTP2Config{}
- }
-
- if c.FCGI == nil {
- c.FCGI = &FCGIConfig{}
- }
-
- if c.Uploads == nil {
- c.Uploads = &UploadsConfig{}
- }
-
- if c.SSL == nil {
- c.SSL = &SSLConfig{}
- }
-
- if c.SSL.Port == 0 {
- c.SSL.Port = 443
- }
-
- err := c.HTTP2.InitDefaults()
- if err != nil {
- return err
- }
- err = c.Uploads.InitDefaults()
- if err != nil {
- return err
- }
-
- if c.TrustedSubnets == nil {
- // @see https://en.wikipedia.org/wiki/Reserved_IP_addresses
- c.TrustedSubnets = []string{
- "10.0.0.0/8",
- "127.0.0.0/8",
- "172.16.0.0/12",
- "192.168.0.0/16",
- "::1/128",
- "fc00::/7",
- "fe80::/10",
- }
- }
-
- cidrs, err := ParseCIDRs(c.TrustedSubnets)
- if err != nil {
- return err
- }
- c.cidrs = cidrs
-
- return c.Valid()
-}
-
-func ParseCIDRs(subnets []string) (Cidrs, error) {
- c := make(Cidrs, 0, len(subnets))
- for _, cidr := range subnets {
- _, cr, err := net.ParseCIDR(cidr)
- if err != nil {
- return nil, err
- }
-
- c = append(c, cr)
- }
-
- return c, nil
-}
-
-// IsTrusted if api can be trusted to use X-Real-Ip, X-Forwarded-For
-func (c *Config) IsTrusted(ip string) bool {
- if c.cidrs == nil {
- return false
- }
-
- i := net.ParseIP(ip)
- if i == nil {
- return false
- }
-
- for _, cird := range c.cidrs {
- if cird.Contains(i) {
- return true
- }
- }
-
- return false
-}
-
-// Valid validates the configuration.
-func (c *Config) Valid() error {
- const op = errors.Op("validation")
- if c.Uploads == nil {
- return errors.E(op, errors.Str("malformed uploads config"))
- }
-
- if c.HTTP2 == nil {
- return errors.E(op, errors.Str("malformed http2 config"))
- }
-
- if c.Pool == nil {
- return errors.E(op, "malformed pool config")
- }
-
- if !c.EnableHTTP() && !c.EnableTLS() && !c.EnableFCGI() {
- return errors.E(op, errors.Str("unable to run http service, no method has been specified (http, https, http/2 or FastCGI)"))
- }
-
- if c.Address != "" && !strings.Contains(c.Address, ":") {
- return errors.E(op, errors.Str("malformed http server address"))
- }
-
- if c.EnableTLS() {
- if _, err := os.Stat(c.SSL.Key); err != nil {
- if os.IsNotExist(err) {
- return errors.E(op, errors.Errorf("key file '%s' does not exists", c.SSL.Key))
- }
-
- return err
- }
-
- if _, err := os.Stat(c.SSL.Cert); err != nil {
- if os.IsNotExist(err) {
- return errors.E(op, errors.Errorf("cert file '%s' does not exists", c.SSL.Cert))
- }
-
- return err
- }
-
- // RootCA is optional, but if provided - check it
- if c.SSL.RootCA != "" {
- if _, err := os.Stat(c.SSL.RootCA); err != nil {
- if os.IsNotExist(err) {
- return errors.E(op, errors.Errorf("root ca path provided, but path '%s' does not exists", c.SSL.RootCA))
- }
- return err
- }
- }
- }
-
- return nil
-}
diff --git a/plugins/http/constants.go b/plugins/http/constants.go
deleted file mode 100644
index 773d1f46..00000000
--- a/plugins/http/constants.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package http
-
-import "net/http"
-
-var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push")
-var TrailerHeaderKey = http.CanonicalHeaderKey("trailer")
diff --git a/plugins/http/errors.go b/plugins/http/errors.go
deleted file mode 100644
index fb8762ef..00000000
--- a/plugins/http/errors.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// +build !windows
-
-package http
-
-import (
- "errors"
- "net"
- "os"
- "syscall"
-)
-
-// Broken pipe
-var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer")
-
-// handleWriteError just check if error was caused by aborted connection on linux
-func handleWriteError(err error) error {
- if netErr, ok2 := err.(*net.OpError); ok2 {
- if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
- if syscallErr.Err == syscall.EPIPE {
- return errEPIPE
- }
- }
- }
- return err
-}
diff --git a/plugins/http/errors_windows.go b/plugins/http/errors_windows.go
deleted file mode 100644
index 3d0ba04c..00000000
--- a/plugins/http/errors_windows.go
+++ /dev/null
@@ -1,27 +0,0 @@
-// +build windows
-
-package http
-
-import (
- "errors"
- "net"
- "os"
- "syscall"
-)
-
-//Software caused connection abort.
-//An established connection was aborted by the software in your host computer,
-//possibly due to a data transmission time-out or protocol error.
-var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer")
-
-// handleWriteError just check if error was caused by aborted connection on windows
-func handleWriteError(err error) error {
- if netErr, ok2 := err.(*net.OpError); ok2 {
- if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
- if syscallErr.Err == syscall.WSAECONNABORTED {
- return errEPIPE
- }
- }
- }
- return err
-}
diff --git a/plugins/http/handler.go b/plugins/http/handler.go
deleted file mode 100644
index 08cee661..00000000
--- a/plugins/http/handler.go
+++ /dev/null
@@ -1,240 +0,0 @@
-package http
-
-import (
- "net"
- "net/http"
- "strconv"
- "strings"
- "sync"
- "time"
-
- "github.com/hashicorp/go-multierror"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner-plugins/logger"
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
-)
-
-const (
- // EventResponse thrown after the request been processed. See ErrorEvent as payload.
- EventResponse = iota + 500
-
- // EventError thrown on any non job error provided by road runner server.
- EventError
-)
-
-const MB = 1024 * 1024
-
-type Handle interface {
- AddListener(l events.EventListener)
- ServeHTTP(w http.ResponseWriter, r *http.Request)
-}
-
-// ErrorEvent represents singular http error event.
-type ErrorEvent struct {
- // Request contains client request, must not be stored.
- Request *http.Request
-
- // Error - associated error, if any.
- Error error
-
- // event timings
- start time.Time
- elapsed time.Duration
-}
-
-// Elapsed returns duration of the invocation.
-func (e *ErrorEvent) Elapsed() time.Duration {
- return e.elapsed
-}
-
-// ResponseEvent represents singular http response event.
-type ResponseEvent struct {
- // Request contains client request, must not be stored.
- Request *Request
-
- // Response contains service response.
- Response *Response
-
- // event timings
- start time.Time
- elapsed time.Duration
-}
-
-// Elapsed returns duration of the invocation.
-func (e *ResponseEvent) Elapsed() time.Duration {
- return e.elapsed
-}
-
-// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
-// parsed files and query, payload will include parsed form dataTree (if any).
-type handler struct {
- maxRequestSize uint64
- uploads UploadsConfig
- trusted Cidrs
- log logger.Logger
- pool pool.Pool
- mul sync.Mutex
- lsn events.EventListener
-}
-
-func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) {
- if pool == nil {
- return nil, errors.E(errors.Str("pool should be initialized"))
- }
- return &handler{
- maxRequestSize: maxReqSize * MB,
- uploads: uploads,
- pool: pool,
- trusted: trusted,
- }, nil
-}
-
-// Listen attaches handler event controller.
-func (h *handler) AddListener(l events.EventListener) {
- h.mul.Lock()
- defer h.mul.Unlock()
-
- h.lsn = l
-}
-
-// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
-func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- const op = errors.Op("ServeHTTP")
- start := time.Now()
-
- // validating request size
- if h.maxRequestSize != 0 {
- err := h.maxSize(w, r, start, op)
- if err != nil {
- return
- }
- }
-
- req, err := NewRequest(r, h.uploads)
- if err != nil {
- h.handleError(w, r, err, start)
- return
- }
-
- // proxy IP resolution
- h.resolveIP(req)
-
- req.Open(h.log)
- defer req.Close(h.log)
-
- p, err := req.Payload()
- if err != nil {
- h.handleError(w, r, err, start)
- return
- }
-
- rsp, err := h.pool.Exec(p)
- if err != nil {
- h.handleError(w, r, err, start)
- return
- }
-
- resp, err := NewResponse(rsp)
- if err != nil {
- h.handleError(w, r, err, start)
- return
- }
-
- h.handleResponse(req, resp, start)
- err = resp.Write(w)
- if err != nil {
- h.handleError(w, r, err, start)
- }
-}
-
-func (h *handler) maxSize(w http.ResponseWriter, r *http.Request, start time.Time, op errors.Op) error {
- if length := r.Header.Get("content-length"); length != "" {
- if size, err := strconv.ParseInt(length, 10, 64); err != nil {
- h.handleError(w, r, err, start)
- return err
- } else if size > int64(h.maxRequestSize) {
- h.handleError(w, r, errors.E(op, errors.Str("request body max size is exceeded")), start)
- return err
- }
- }
- return nil
-}
-
-// handleError sends error.
-func (h *handler) handleError(w http.ResponseWriter, r *http.Request, err error, start time.Time) {
- h.mul.Lock()
- defer h.mul.Unlock()
- // if pipe is broken, there is no sense to write the header
- // in this case we just report about error
- if err == errEPIPE {
- h.throw(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
- return
- }
- err = multierror.Append(err)
- // ResponseWriter is ok, write the error code
- w.WriteHeader(500)
- _, err2 := w.Write([]byte(err.Error()))
- // error during the writing to the ResponseWriter
- if err2 != nil {
- err = multierror.Append(err2, err)
- // concat original error with ResponseWriter error
- h.throw(ErrorEvent{Request: r, Error: errors.E(err), start: start, elapsed: time.Since(start)})
- return
- }
- h.throw(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
-}
-
-// handleResponse triggers response event.
-func (h *handler) handleResponse(req *Request, resp *Response, start time.Time) {
- h.throw(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)})
-}
-
-// throw invokes event handler if any.
-func (h *handler) throw(event interface{}) {
- if h.lsn != nil {
- h.lsn(event)
- }
-}
-
-// get real ip passing multiple proxy
-func (h *handler) resolveIP(r *Request) {
- if h.trusted.IsTrusted(r.RemoteAddr) == false {
- return
- }
-
- if r.Header.Get("X-Forwarded-For") != "" {
- ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",")
- ipCount := len(ips)
-
- for i := ipCount - 1; i >= 0; i-- {
- addr := strings.TrimSpace(ips[i])
- if net.ParseIP(addr) != nil {
- r.RemoteAddr = addr
- return
- }
- }
-
- return
- }
-
- // The logic here is the following:
- // In general case, we only expect X-Real-Ip header. If it exist, we get the IP address from header and set request Remote address
- // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers
- // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF.
- // CF-Connecting-IP is an Enterprise feature and we check it last in order.
- // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string
- if r.Header.Get("X-Real-Ip") != "" {
- r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip"))
- return
- }
-
- if r.Header.Get("True-Client-IP") != "" {
- r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP"))
- return
- }
-
- if r.Header.Get("CF-Connecting-IP") != "" {
- r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP"))
- }
-}
diff --git a/plugins/http/parse.go b/plugins/http/parse.go
deleted file mode 100644
index d4a1604b..00000000
--- a/plugins/http/parse.go
+++ /dev/null
@@ -1,147 +0,0 @@
-package http
-
-import (
- "net/http"
-)
-
-// MaxLevel defines maximum tree depth for incoming request data and files.
-const MaxLevel = 127
-
-type dataTree map[string]interface{}
-type fileTree map[string]interface{}
-
-// parseData parses incoming request body into data tree.
-func parseData(r *http.Request) dataTree {
- data := make(dataTree)
- if r.PostForm != nil {
- for k, v := range r.PostForm {
- data.push(k, v)
- }
- }
-
- if r.MultipartForm != nil {
- for k, v := range r.MultipartForm.Value {
- data.push(k, v)
- }
- }
-
- return data
-}
-
-// pushes value into data tree.
-func (d dataTree) push(k string, v []string) {
- keys := FetchIndexes(k)
- if len(keys) <= MaxLevel {
- d.mount(keys, v)
- }
-}
-
-// mount mounts data tree recursively.
-func (d dataTree) mount(i []string, v []string) {
- if len(i) == 1 {
- // single value context (last element)
- d[i[0]] = v[len(v)-1]
- return
- }
-
- if len(i) == 2 && i[1] == "" {
- // non associated array of elements
- d[i[0]] = v
- return
- }
-
- if p, ok := d[i[0]]; ok {
- p.(dataTree).mount(i[1:], v)
- return
- }
-
- d[i[0]] = make(dataTree)
- d[i[0]].(dataTree).mount(i[1:], v)
-}
-
-// parse incoming dataTree request into JSON (including contentMultipart form dataTree)
-func parseUploads(r *http.Request, cfg UploadsConfig) *Uploads {
- u := &Uploads{
- cfg: cfg,
- tree: make(fileTree),
- list: make([]*FileUpload, 0),
- }
-
- for k, v := range r.MultipartForm.File {
- files := make([]*FileUpload, 0, len(v))
- for _, f := range v {
- files = append(files, NewUpload(f))
- }
-
- u.list = append(u.list, files...)
- u.tree.push(k, files)
- }
-
- return u
-}
-
-// pushes new file upload into it's proper place.
-func (d fileTree) push(k string, v []*FileUpload) {
- keys := FetchIndexes(k)
- if len(keys) <= MaxLevel {
- d.mount(keys, v)
- }
-}
-
-// mount mounts data tree recursively.
-func (d fileTree) mount(i []string, v []*FileUpload) {
- if len(i) == 1 {
- // single value context
- d[i[0]] = v[0]
- return
- }
-
- if len(i) == 2 && i[1] == "" {
- // non associated array of elements
- d[i[0]] = v
- return
- }
-
- if p, ok := d[i[0]]; ok {
- p.(fileTree).mount(i[1:], v)
- return
- }
-
- d[i[0]] = make(fileTree)
- d[i[0]].(fileTree).mount(i[1:], v)
-}
-
-// FetchIndexes parses input name and splits it into separate indexes list.
-func FetchIndexes(s string) []string {
- var (
- pos int
- ch string
- keys = make([]string, 1)
- )
-
- for _, c := range s {
- ch = string(c)
- switch ch {
- case " ":
- // ignore all spaces
- continue
- case "[":
- pos = 1
- continue
- case "]":
- if pos == 1 {
- keys = append(keys, "")
- }
- pos = 2
- default:
- if pos == 1 || pos == 2 {
- keys = append(keys, "")
- }
-
- keys[len(keys)-1] += ch
- pos = 0
- }
- }
-
- return keys
-}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
deleted file mode 100644
index 301468d1..00000000
--- a/plugins/http/plugin.go
+++ /dev/null
@@ -1,528 +0,0 @@
-package http
-
-import (
- "context"
- "crypto/tls"
- "crypto/x509"
- "fmt"
- "io/ioutil"
- "net/http"
- "net/http/fcgi"
- "net/url"
- "strings"
- "sync"
-
- "github.com/hashicorp/go-multierror"
- "github.com/spiral/endure"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner-plugins/checker"
- "github.com/spiral/roadrunner-plugins/config"
- "github.com/spiral/roadrunner-plugins/logger"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/plugins/http/attributes"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/util"
- "golang.org/x/net/http2"
- "golang.org/x/net/http2/h2c"
- "golang.org/x/sys/cpu"
-)
-
-const (
- // PluginName declares plugin name.
- PluginName = "http"
-
- // EventInitSSL thrown at moment of https initialization. SSL server passed as context.
- EventInitSSL = 750
-)
-
-// Middleware interface
-type Middleware interface {
- Middleware(f http.Handler) http.HandlerFunc
-}
-
-type middleware map[string]Middleware
-
-// Service manages pool, http servers.
-type Plugin struct {
- sync.RWMutex
-
- configurer config.Configurer
- server server.Server
- log logger.Logger
-
- cfg *Config
- // middlewares to chain
- mdwr middleware
-
- // Pool which attached to all servers
- pool pool.Pool
-
- // servers RR handler
- handler Handle
-
- // servers
- http *http.Server
- https *http.Server
- fcgi *http.Server
-}
-
-// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
-// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
- const op = errors.Op("http Init")
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
- if err != nil {
- return errors.E(op, err)
- }
-
- err = s.cfg.InitDefaults()
- if err != nil {
- return errors.E(op, err)
- }
-
- s.configurer = cfg
- s.log = log
- s.mdwr = make(map[string]Middleware)
-
- if !s.cfg.EnableHTTP() && !s.cfg.EnableTLS() && !s.cfg.EnableFCGI() {
- return errors.E(op, errors.Disabled)
- }
-
- s.pool, err = server.NewWorkerPool(context.Background(), poolImpl.Config{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, s.cfg.Env, s.logCallback)
- if err != nil {
- return errors.E(op, err)
- }
-
- s.server = server
-
- return nil
-}
-
-func (s *Plugin) logCallback(event interface{}) {
- if ev, ok := event.(ResponseEvent); ok {
- s.log.Debug("http handler response received", "elapsed", ev.Elapsed().String(), "remote address", ev.Request.RemoteAddr)
- }
-}
-
-// Serve serves the svc.
-func (s *Plugin) Serve() chan error {
- s.Lock()
- defer s.Unlock()
-
- const op = errors.Op("serve http")
- errCh := make(chan error, 2)
-
- var err error
- s.handler, err = NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.cidrs,
- s.pool,
- )
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- s.handler.AddListener(s.logCallback)
-
- if s.cfg.EnableHTTP() {
- if s.cfg.EnableH2C() {
- s.http = &http.Server{Addr: s.cfg.Address, Handler: h2c.NewHandler(s, &http2.Server{})}
- } else {
- s.http = &http.Server{Addr: s.cfg.Address, Handler: s}
- }
- }
-
- if s.cfg.EnableTLS() {
- s.https = s.initSSL()
- if s.cfg.SSL.RootCA != "" {
- err = s.appendRootCa()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
- }
-
- if s.cfg.EnableHTTP2() {
- if err := s.initHTTP2(); err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
- }
- }
-
- if s.cfg.EnableFCGI() {
- s.fcgi = &http.Server{Handler: s}
- }
-
- // apply middlewares before starting the server
- if len(s.mdwr) > 0 {
- s.addMiddlewares()
- }
-
- if s.http != nil {
- go func() {
- httpErr := s.http.ListenAndServe()
- if httpErr != nil && httpErr != http.ErrServerClosed {
- errCh <- errors.E(op, httpErr)
- return
- }
- }()
- }
-
- if s.https != nil {
- go func() {
- httpErr := s.https.ListenAndServeTLS(
- s.cfg.SSL.Cert,
- s.cfg.SSL.Key,
- )
-
- if httpErr != nil && httpErr != http.ErrServerClosed {
- errCh <- errors.E(op, httpErr)
- return
- }
- }()
- }
-
- if s.fcgi != nil {
- go func() {
- httpErr := s.serveFCGI()
- if httpErr != nil && httpErr != http.ErrServerClosed {
- errCh <- errors.E(op, httpErr)
- return
- }
- }()
- }
-
- return errCh
-}
-
-// Stop stops the http.
-func (s *Plugin) Stop() error {
- s.Lock()
- defer s.Unlock()
-
- var err error
- if s.fcgi != nil {
- err = s.fcgi.Shutdown(context.Background())
- if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the fcgi server", "error", err)
- // write error and try to stop other transport
- err = multierror.Append(err)
- }
- }
-
- if s.https != nil {
- err = s.https.Shutdown(context.Background())
- if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the https server", "error", err)
- // write error and try to stop other transport
- err = multierror.Append(err)
- }
- }
-
- if s.http != nil {
- err = s.http.Shutdown(context.Background())
- if err != nil && err != http.ErrServerClosed {
- s.log.Error("error shutting down the http server", "error", err)
- // write error and try to stop other transport
- err = multierror.Append(err)
- }
- }
-
- s.pool.Destroy(context.Background())
-
- return err
-}
-
-// ServeHTTP handles connection using set of middleware and pool PSR-7 server.
-func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- if headerContainsUpgrade(r, s) {
- http.Error(w, "server does not support upgrade header", http.StatusInternalServerError)
- return
- }
-
- if s.redirect(w, r) {
- return
- }
-
- if s.https != nil && r.TLS != nil {
- w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload")
- }
-
- r = attributes.Init(r)
- // protect the case, when user send Reset and we are replacing handler with pool
- s.RLock()
- s.handler.ServeHTTP(w, r)
- s.RUnlock()
-}
-
-// Server returns associated pool workers
-func (s *Plugin) Workers() []worker.BaseProcess {
- return s.pool.Workers()
-}
-
-func (s *Plugin) Name() string {
- return PluginName
-}
-
-func (s *Plugin) Reset() error {
- s.Lock()
- defer s.Unlock()
- const op = errors.Op("http reset")
- s.log.Info("HTTP plugin got restart request. Restarting...")
- s.pool.Destroy(context.Background())
- s.pool = nil
-
- // re-read the config
- err := s.configurer.UnmarshalKey(PluginName, &s.cfg)
- if err != nil {
- return errors.E(op, err)
- }
-
- s.pool, err = s.server.NewWorkerPool(context.Background(), poolImpl.Config{
- Debug: s.cfg.Pool.Debug,
- NumWorkers: s.cfg.Pool.NumWorkers,
- MaxJobs: s.cfg.Pool.MaxJobs,
- AllocateTimeout: s.cfg.Pool.AllocateTimeout,
- DestroyTimeout: s.cfg.Pool.DestroyTimeout,
- Supervisor: s.cfg.Pool.Supervisor,
- }, s.cfg.Env, s.logCallback)
- if err != nil {
- return errors.E(op, err)
- }
-
- s.log.Info("HTTP listeners successfully re-added")
-
- s.log.Info("HTTP workers Pool successfully restarted")
- s.handler, err = NewHandler(
- s.cfg.MaxRequestSize,
- *s.cfg.Uploads,
- s.cfg.cidrs,
- s.pool,
- )
- if err != nil {
- return errors.E(op, err)
- }
-
- s.log.Info("HTTP plugin successfully restarted")
- return nil
-}
-
-func (s *Plugin) Collects() []interface{} {
- return []interface{}{
- s.AddMiddleware,
- }
-}
-
-func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) {
- s.mdwr[name.Name()] = m
-}
-
-// Status return status of the particular plugin
-func (s *Plugin) Status() checker.Status {
- workers := s.Workers()
- for i := 0; i < len(workers); i++ {
- if workers[i].State().IsActive() {
- return checker.Status{
- Code: http.StatusOK,
- }
- }
- }
- // if there are no workers, threat this as error
- return checker.Status{
- Code: http.StatusInternalServerError,
- }
-}
-
-func (s *Plugin) redirect(w http.ResponseWriter, r *http.Request) bool {
- if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect {
- target := &url.URL{
- Scheme: "https",
- Host: s.tlsAddr(r.Host, false),
- Path: r.URL.Path,
- RawQuery: r.URL.RawQuery,
- }
-
- http.Redirect(w, r, target.String(), http.StatusTemporaryRedirect)
- return true
- }
- return false
-}
-
-func headerContainsUpgrade(r *http.Request, s *Plugin) bool {
- if _, ok := r.Header["Upgrade"]; ok {
- // https://golang.org/pkg/net/http/#Hijacker
- s.log.Error("server does not support Upgrade header")
- return true
- }
- return false
-}
-
-// append RootCA to the https server TLS config
-func (s *Plugin) appendRootCa() error {
- const op = errors.Op("append root CA")
- rootCAs, err := x509.SystemCertPool()
- if err != nil {
- return nil
- }
- if rootCAs == nil {
- rootCAs = x509.NewCertPool()
- }
-
- CA, err := ioutil.ReadFile(s.cfg.SSL.RootCA)
- if err != nil {
- return err
- }
-
- // should append our CA cert
- ok := rootCAs.AppendCertsFromPEM(CA)
- if !ok {
- return errors.E(op, errors.Str("could not append Certs from PEM"))
- }
- // disable "G402 (CWE-295): TLS MinVersion too low. (Confidence: HIGH, Severity: HIGH)"
- // #nosec G402
- cfg := &tls.Config{
- InsecureSkipVerify: false,
- RootCAs: rootCAs,
- }
- s.http.TLSConfig = cfg
-
- return nil
-}
-
-// Init https server
-func (s *Plugin) initSSL() *http.Server {
- var topCipherSuites []uint16
- var defaultCipherSuitesTLS13 []uint16
-
- hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ
- hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL
- // Keep in sync with crypto/aes/cipher_s390x.go.
- hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM)
-
- hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X
-
- if hasGCMAsm {
- // If AES-GCM hardware is provided then prioritise AES-GCM
- // cipher suites.
- topCipherSuites = []uint16{
- tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
- tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
- tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
- tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
- tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
- tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
- }
- defaultCipherSuitesTLS13 = []uint16{
- tls.TLS_AES_128_GCM_SHA256,
- tls.TLS_CHACHA20_POLY1305_SHA256,
- tls.TLS_AES_256_GCM_SHA384,
- }
- } else {
- // Without AES-GCM hardware, we put the ChaCha20-Poly1305
- // cipher suites first.
- topCipherSuites = []uint16{
- tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
- tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
- tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
- tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
- tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
- tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
- }
- defaultCipherSuitesTLS13 = []uint16{
- tls.TLS_CHACHA20_POLY1305_SHA256,
- tls.TLS_AES_128_GCM_SHA256,
- tls.TLS_AES_256_GCM_SHA384,
- }
- }
-
- DefaultCipherSuites := make([]uint16, 0, 22)
- DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...)
- DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...)
-
- server := &http.Server{
- Addr: s.tlsAddr(s.cfg.Address, true),
- Handler: s,
- TLSConfig: &tls.Config{
- CurvePreferences: []tls.CurveID{
- tls.CurveP256,
- tls.CurveP384,
- tls.CurveP521,
- tls.X25519,
- },
- CipherSuites: DefaultCipherSuites,
- MinVersion: tls.VersionTLS12,
- PreferServerCipherSuites: true,
- },
- }
-
- return server
-}
-
-// init http/2 server
-func (s *Plugin) initHTTP2() error {
- return http2.ConfigureServer(s.https, &http2.Server{
- MaxConcurrentStreams: s.cfg.HTTP2.MaxConcurrentStreams,
- })
-}
-
-// serveFCGI starts FastCGI server.
-func (s *Plugin) serveFCGI() error {
- l, err := util.CreateListener(s.cfg.FCGI.Address)
- if err != nil {
- return err
- }
-
- err = fcgi.Serve(l, s.fcgi.Handler)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-// tlsAddr replaces listen or host port with port configured by SSL config.
-func (s *Plugin) tlsAddr(host string, forcePort bool) string {
- // remove current forcePort first
- host = strings.Split(host, ":")[0]
-
- if forcePort || s.cfg.SSL.Port != 443 {
- host = fmt.Sprintf("%s:%v", host, s.cfg.SSL.Port)
- }
-
- return host
-}
-
-func (s *Plugin) addMiddlewares() {
- if s.http != nil {
- applyMiddlewares(s.http, s.mdwr, s.cfg.Middleware, s.log)
- }
- if s.https != nil {
- applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log)
- }
-
- if s.fcgi != nil {
- applyMiddlewares(s.fcgi, s.mdwr, s.cfg.Middleware, s.log)
- }
-}
-
-func applyMiddlewares(server *http.Server, middlewares map[string]Middleware, order []string, log logger.Logger) {
- for i := 0; i < len(order); i++ {
- if mdwr, ok := middlewares[order[i]]; ok {
- server.Handler = mdwr.Middleware(server.Handler)
- } else {
- log.Warn("requested middleware does not exist", "requested", order[i])
- }
- }
-}
diff --git a/plugins/http/request.go b/plugins/http/request.go
deleted file mode 100644
index 16243bcc..00000000
--- a/plugins/http/request.go
+++ /dev/null
@@ -1,186 +0,0 @@
-package http
-
-import (
- "fmt"
- "io/ioutil"
- "net"
- "net/http"
- "net/url"
- "strings"
-
- j "github.com/json-iterator/go"
- "github.com/spiral/roadrunner-plugins/logger"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/plugins/http/attributes"
-)
-
-var json = j.ConfigCompatibleWithStandardLibrary
-
-const (
- defaultMaxMemory = 32 << 20 // 32 MB
- contentNone = iota + 900
- contentStream
- contentMultipart
- contentFormData
-)
-
-// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files.
-type Request struct {
- // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address.
- RemoteAddr string `json:"remoteAddr"`
-
- // Protocol includes HTTP protocol version.
- Protocol string `json:"protocol"`
-
- // Method contains name of HTTP method used for the request.
- Method string `json:"method"`
-
- // URI contains full request URI with scheme and query.
- URI string `json:"uri"`
-
- // Header contains list of request headers.
- Header http.Header `json:"headers"`
-
- // Cookies contains list of request cookies.
- Cookies map[string]string `json:"cookies"`
-
- // RawQuery contains non parsed query string (to be parsed on php end).
- RawQuery string `json:"rawQuery"`
-
- // Parsed indicates that request body has been parsed on RR end.
- Parsed bool `json:"parsed"`
-
- // Uploads contains list of uploaded files, their names, sized and associations with temporary files.
- Uploads *Uploads `json:"uploads"`
-
- // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions.
- Attributes map[string]interface{} `json:"attributes"`
-
- // request body can be parsedData or []byte
- body interface{}
-}
-
-func fetchIP(pair string) string {
- if !strings.ContainsRune(pair, ':') {
- return pair
- }
-
- addr, _, _ := net.SplitHostPort(pair)
- return addr
-}
-
-// NewRequest creates new PSR7 compatible request using net/http request.
-func NewRequest(r *http.Request, cfg UploadsConfig) (*Request, error) {
- req := &Request{
- RemoteAddr: fetchIP(r.RemoteAddr),
- Protocol: r.Proto,
- Method: r.Method,
- URI: uri(r),
- Header: r.Header,
- Cookies: make(map[string]string),
- RawQuery: r.URL.RawQuery,
- Attributes: attributes.All(r),
- }
-
- for _, c := range r.Cookies() {
- if v, err := url.QueryUnescape(c.Value); err == nil {
- req.Cookies[c.Name] = v
- }
- }
-
- switch req.contentType() {
- case contentNone:
- return req, nil
-
- case contentStream:
- var err error
- req.body, err = ioutil.ReadAll(r.Body)
- return req, err
-
- case contentMultipart:
- if err := r.ParseMultipartForm(defaultMaxMemory); err != nil {
- return nil, err
- }
-
- req.Uploads = parseUploads(r, cfg)
- fallthrough
- case contentFormData:
- if err := r.ParseForm(); err != nil {
- return nil, err
- }
-
- req.body = parseData(r)
- }
-
- req.Parsed = true
- return req, nil
-}
-
-// Open moves all uploaded files to temporary directory so it can be given to php later.
-func (r *Request) Open(log logger.Logger) {
- if r.Uploads == nil {
- return
- }
-
- r.Uploads.Open(log)
-}
-
-// Close clears all temp file uploads
-func (r *Request) Close(log logger.Logger) {
- if r.Uploads == nil {
- return
- }
-
- r.Uploads.Clear(log)
-}
-
-// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
-// files prior to calling this method.
-func (r *Request) Payload() (payload.Payload, error) {
- p := payload.Payload{}
-
- var err error
- if p.Context, err = json.Marshal(r); err != nil {
- return payload.Payload{}, err
- }
-
- if r.Parsed {
- if p.Body, err = json.Marshal(r.body); err != nil {
- return payload.Payload{}, err
- }
- } else if r.body != nil {
- p.Body = r.body.([]byte)
- }
-
- return p, nil
-}
-
-// contentType returns the payload content type.
-func (r *Request) contentType() int {
- if r.Method == "HEAD" || r.Method == "OPTIONS" {
- return contentNone
- }
-
- ct := r.Header.Get("content-type")
- if strings.Contains(ct, "application/x-www-form-urlencoded") {
- return contentFormData
- }
-
- if strings.Contains(ct, "multipart/form-data") {
- return contentMultipart
- }
-
- return contentStream
-}
-
-// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
-func uri(r *http.Request) string {
- if r.URL.Host != "" {
- return r.URL.String()
- }
- if r.TLS != nil {
- return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
- }
-
- return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
-}
diff --git a/plugins/http/response.go b/plugins/http/response.go
deleted file mode 100644
index 17049ce1..00000000
--- a/plugins/http/response.go
+++ /dev/null
@@ -1,105 +0,0 @@
-package http
-
-import (
- "io"
- "net/http"
- "strings"
- "sync"
-
- "github.com/spiral/roadrunner/v2/pkg/payload"
-)
-
-// Response handles PSR7 response logic.
-type Response struct {
- // Status contains response status.
- Status int `json:"status"`
-
- // Header contains list of response headers.
- Headers map[string][]string `json:"headers"`
-
- // associated Body payload.
- Body interface{}
- sync.Mutex
-}
-
-// NewResponse creates new response based on given pool payload.
-func NewResponse(p payload.Payload) (*Response, error) {
- r := &Response{Body: p.Body}
- if err := json.Unmarshal(p.Context, r); err != nil {
- return nil, err
- }
-
- return r, nil
-}
-
-// Write writes response headers, status and body into ResponseWriter.
-func (r *Response) Write(w http.ResponseWriter) error {
- // INFO map is the reference type in golang
- p := handlePushHeaders(r.Headers)
- if pusher, ok := w.(http.Pusher); ok {
- for _, v := range p {
- err := pusher.Push(v, nil)
- if err != nil {
- return err
- }
- }
- }
-
- handleTrailers(r.Headers)
- for n, h := range r.Headers {
- for _, v := range h {
- w.Header().Add(n, v)
- }
- }
-
- w.WriteHeader(r.Status)
-
- if data, ok := r.Body.([]byte); ok {
- _, err := w.Write(data)
- if err != nil {
- return handleWriteError(err)
- }
- }
-
- if rc, ok := r.Body.(io.Reader); ok {
- if _, err := io.Copy(w, rc); err != nil {
- return err
- }
- }
-
- return nil
-}
-
-func handlePushHeaders(h map[string][]string) []string {
- var p []string
- pushHeader, ok := h[http2pushHeaderKey]
- if !ok {
- return p
- }
-
- p = append(p, pushHeader...)
-
- delete(h, http2pushHeaderKey)
-
- return p
-}
-
-func handleTrailers(h map[string][]string) {
- trailers, ok := h[TrailerHeaderKey]
- if !ok {
- return
- }
-
- for _, tr := range trailers {
- for _, n := range strings.Split(tr, ",") {
- n = strings.Trim(n, "\t ")
- if v, ok := h[n]; ok {
- h["Trailer:"+n] = v
-
- delete(h, n)
- }
- }
- }
-
- delete(h, TrailerHeaderKey)
-}
diff --git a/plugins/http/uploads.go b/plugins/http/uploads.go
deleted file mode 100644
index 1f14cc0d..00000000
--- a/plugins/http/uploads.go
+++ /dev/null
@@ -1,158 +0,0 @@
-package http
-
-import (
- "github.com/spiral/roadrunner-plugins/logger"
-
- "io"
- "io/ioutil"
- "mime/multipart"
- "os"
- "sync"
-)
-
-const (
- // UploadErrorOK - no error, the file uploaded with success.
- UploadErrorOK = 0
-
- // UploadErrorNoFile - no file was uploaded.
- UploadErrorNoFile = 4
-
- // UploadErrorNoTmpDir - missing a temporary folder.
- UploadErrorNoTmpDir = 6
-
- // UploadErrorCantWrite - failed to write file to disk.
- UploadErrorCantWrite = 7
-
- // UploadErrorExtension - forbidden file extension.
- UploadErrorExtension = 8
-)
-
-// Uploads tree manages uploaded files tree and temporary files.
-type Uploads struct {
- // associated temp directory and forbidden extensions.
- cfg UploadsConfig
-
- // pre processed data tree for Uploads.
- tree fileTree
-
- // flat list of all file Uploads.
- list []*FileUpload
-}
-
-// MarshalJSON marshal tree tree into JSON.
-func (u *Uploads) MarshalJSON() ([]byte, error) {
- return json.Marshal(u.tree)
-}
-
-// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
-// will be handled individually.
-func (u *Uploads) Open(log logger.Logger) {
- var wg sync.WaitGroup
- for _, f := range u.list {
- wg.Add(1)
- go func(f *FileUpload) {
- defer wg.Done()
- err := f.Open(u.cfg)
- if err != nil && log != nil {
- log.Error("error opening the file", "err", err)
- }
- }(f)
- }
-
- wg.Wait()
-}
-
-// Clear deletes all temporary files.
-func (u *Uploads) Clear(log logger.Logger) {
- for _, f := range u.list {
- if f.TempFilename != "" && exists(f.TempFilename) {
- err := os.Remove(f.TempFilename)
- if err != nil && log != nil {
- log.Error("error removing the file", "err", err)
- }
- }
- }
-}
-
-// FileUpload represents singular file NewUpload.
-type FileUpload struct {
- // ID contains filename specified by the client.
- Name string `json:"name"`
-
- // Mime contains mime-type provided by the client.
- Mime string `json:"mime"`
-
- // Size of the uploaded file.
- Size int64 `json:"size"`
-
- // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php
- Error int `json:"error"`
-
- // TempFilename points to temporary file location.
- TempFilename string `json:"tmpName"`
-
- // associated file header
- header *multipart.FileHeader
-}
-
-// NewUpload wraps net/http upload into PRS-7 compatible structure.
-func NewUpload(f *multipart.FileHeader) *FileUpload {
- return &FileUpload{
- Name: f.Filename,
- Mime: f.Header.Get("Content-Type"),
- Error: UploadErrorOK,
- header: f,
- }
-}
-
-// Open moves file content into temporary file available for PHP.
-// NOTE:
-// There is 2 deferred functions, and in case of getting 2 errors from both functions
-// error from close of temp file would be overwritten by error from the main file
-// STACK
-// DEFER FILE CLOSE (2)
-// DEFER TMP CLOSE (1)
-func (f *FileUpload) Open(cfg UploadsConfig) (err error) {
- if cfg.Forbids(f.Name) {
- f.Error = UploadErrorExtension
- return nil
- }
-
- file, err := f.header.Open()
- if err != nil {
- f.Error = UploadErrorNoFile
- return err
- }
-
- defer func() {
- // close the main file
- err = file.Close()
- }()
-
- tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload")
- if err != nil {
- // most likely cause of this issue is missing tmp dir
- f.Error = UploadErrorNoTmpDir
- return err
- }
-
- f.TempFilename = tmp.Name()
- defer func() {
- // close the temp file
- err = tmp.Close()
- }()
-
- if f.Size, err = io.Copy(tmp, file); err != nil {
- f.Error = UploadErrorCantWrite
- }
-
- return err
-}
-
-// exists if file exists.
-func exists(path string) bool {
- if _, err := os.Stat(path); os.IsNotExist(err) {
- return false
- }
- return true
-}
diff --git a/plugins/http/uploads_config.go b/plugins/http/uploads_config.go
deleted file mode 100644
index 4c20c8e8..00000000
--- a/plugins/http/uploads_config.go
+++ /dev/null
@@ -1,46 +0,0 @@
-package http
-
-import (
- "os"
- "path"
- "strings"
-)
-
-// UploadsConfig describes file location and controls access to them.
-type UploadsConfig struct {
- // Dir contains name of directory to control access to.
- Dir string
-
- // Forbid specifies list of file extensions which are forbidden for access.
- // Example: .php, .exe, .bat, .htaccess and etc.
- Forbid []string
-}
-
-// InitDefaults sets missing values to their default values.
-func (cfg *UploadsConfig) InitDefaults() error {
- cfg.Forbid = []string{".php", ".exe", ".bat"}
- cfg.Dir = os.TempDir()
- return nil
-}
-
-// TmpDir returns temporary directory.
-func (cfg *UploadsConfig) TmpDir() string {
- if cfg.Dir != "" {
- return cfg.Dir
- }
-
- return os.TempDir()
-}
-
-// Forbids must return true if file extension is not allowed for the upload.
-func (cfg *UploadsConfig) Forbids(filename string) bool {
- ext := strings.ToLower(path.Ext(filename))
-
- for _, v := range cfg.Forbid {
- if ext == v {
- return true
- }
- }
-
- return false
-}
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
deleted file mode 100644
index 27139ae1..00000000
--- a/plugins/informer/interface.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package informer
-
-import "github.com/spiral/roadrunner/v2/interfaces/worker"
-
-// Informer used to get workers from particular plugin or set of plugins
-type Informer interface {
- Workers() []worker.BaseProcess
-}
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
deleted file mode 100644
index e2da7d86..00000000
--- a/plugins/informer/plugin.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package informer
-
-import (
- "github.com/spiral/endure"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner-plugins/logger"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
-)
-
-const PluginName = "informer"
-
-type Plugin struct {
- registry map[string]Informer
- log logger.Logger
-}
-
-func (p *Plugin) Init(log logger.Logger) error {
- p.registry = make(map[string]Informer)
- p.log = log
- return nil
-}
-
-// Workers provides BaseProcess slice with workers for the requested plugin
-func (p *Plugin) Workers(name string) ([]worker.BaseProcess, error) {
- const op = errors.Op("get workers")
- svc, ok := p.registry[name]
- if !ok {
- return nil, errors.E(op, errors.Errorf("no such service: %s", name))
- }
-
- return svc.Workers(), nil
-}
-
-// CollectTarget resettable service.
-func (p *Plugin) CollectTarget(name endure.Named, r Informer) error {
- p.registry[name.Name()] = r
- return nil
-}
-
-// Collects declares services to be collected.
-func (p *Plugin) Collects() []interface{} {
- return []interface{}{
- p.CollectTarget,
- }
-}
-
-// Name of the service.
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-// RPCService returns associated rpc service.
-func (p *Plugin) RPC() interface{} {
- return &rpc{srv: p, log: p.log}
-}
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
deleted file mode 100644
index d32d4e3a..00000000
--- a/plugins/informer/rpc.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package informer
-
-import (
- "github.com/spiral/roadrunner-plugins/logger"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- "github.com/spiral/roadrunner/v2/tools"
-)
-
-type rpc struct {
- srv *Plugin
- log logger.Logger
-}
-
-// WorkerList contains list of workers.
-type WorkerList struct {
- // Workers is list of workers.
- Workers []tools.ProcessState `json:"workers"`
-}
-
-// List all resettable services.
-func (rpc *rpc) List(_ bool, list *[]string) error {
- rpc.log.Debug("Started List method")
- *list = make([]string, 0, len(rpc.srv.registry))
-
- for name := range rpc.srv.registry {
- *list = append(*list, name)
- }
- rpc.log.Debug("list of services", "list", *list)
-
- rpc.log.Debug("successfully finished List method")
- return nil
-}
-
-// Workers state of a given service.
-func (rpc *rpc) Workers(service string, list *WorkerList) error {
- rpc.log.Debug("started Workers method", "service", service)
- workers, err := rpc.srv.Workers(service)
- if err != nil {
- return err
- }
-
- list.Workers = make([]tools.ProcessState, 0)
- for _, w := range workers {
- ps, err := tools.WorkerProcessState(w.(worker.BaseProcess))
- if err != nil {
- continue
- }
-
- list.Workers = append(list.Workers, ps)
- }
- rpc.log.Debug("list of workers", "workers", list.Workers)
- rpc.log.Debug("successfully finished Workers method")
- return nil
-}
diff --git a/plugins/server/config.go b/plugins/server/config.go
deleted file mode 100644
index 4bef3c5f..00000000
--- a/plugins/server/config.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package server
-
-import (
- "time"
-)
-
-// Config config combines factory, pool and cmd configurations.
-type Config struct {
- // Command to run as application.
- Command string
-
- // User to run application under.
- User string
-
- // Group to run application under.
- Group string
-
- // Env represents application environment.
- Env Env
-
- // Listen defines connection method and factory to be used to connect to workers:
- // "pipes", "tcp://:6001", "unix://rr.sock"
- // This config section must not change on re-configuration.
- Relay string
-
- // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
- // must not change on re-configuration. Defaults to 60s.
- RelayTimeout time.Duration
-}
-
-func (cfg *Config) InitDefaults() {
- if cfg.Relay == "" {
- cfg.Relay = "pipes"
- }
-
- if cfg.RelayTimeout == 0 {
- cfg.RelayTimeout = time.Second * 60
- }
-}
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
deleted file mode 100644
index 9c1079ea..00000000
--- a/plugins/server/interface.go
+++ /dev/null
@@ -1,20 +0,0 @@
-package server
-
-import (
- "context"
- "os/exec"
-
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
-)
-
-type Env map[string]string
-
-// Server creates workers for the application.
-type Server interface {
- CmdFactory(env Env) (func() *exec.Cmd, error)
- NewWorker(ctx context.Context, env Env, listeners ...events.EventListener) (worker.BaseProcess, error)
- NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.EventListener) (pool.Pool, error)
-}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
deleted file mode 100644
index 3d90c95b..00000000
--- a/plugins/server/plugin.go
+++ /dev/null
@@ -1,229 +0,0 @@
-package server
-
-import (
- "context"
- "fmt"
- "os"
- "os/exec"
- "strings"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner-plugins/config"
- "github.com/spiral/roadrunner-plugins/logger"
-
- // core imports
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- "github.com/spiral/roadrunner/v2/pkg/pipe"
- poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/socket"
- "github.com/spiral/roadrunner/v2/util"
-)
-
-const PluginName = "server"
-
-// Plugin manages worker
-type Plugin struct {
- cfg Config
- log logger.Logger
- factory worker.Factory
-}
-
-// Init application provider.
-func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("Init")
- err := cfg.UnmarshalKey(PluginName, &server.cfg)
- if err != nil {
- return errors.E(op, errors.Init, err)
- }
- server.cfg.InitDefaults()
- server.log = log
-
- server.factory, err = server.initFactory()
- if err != nil {
- return errors.E(errors.Op("Init factory"), err)
- }
-
- return nil
-}
-
-// Name contains service name.
-func (server *Plugin) Name() string {
- return PluginName
-}
-
-func (server *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
- return errCh
-}
-
-func (server *Plugin) Stop() error {
- if server.factory == nil {
- return nil
- }
-
- return server.factory.Close()
-}
-
-// CmdFactory provides worker command factory associated with given context.
-func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
- const op = errors.Op("cmd factory")
- var cmdArgs []string
-
- // create command according to the config
- cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...)
- if len(cmdArgs) < 2 {
- return nil, errors.E(op, errors.Str("should be in form of `php <script>"))
- }
- if cmdArgs[0] != "php" {
- return nil, errors.E(op, errors.Str("first arg in command should be `php`"))
- }
-
- _, err := os.Stat(cmdArgs[1])
- if err != nil {
- return nil, errors.E(op, err)
- }
- return func() *exec.Cmd {
- cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec
- util.IsolateProcess(cmd)
-
- // if user is not empty, and OS is linux or macos
- // execute php worker from that particular user
- if server.cfg.User != "" {
- err := util.ExecuteFromUser(cmd, server.cfg.User)
- if err != nil {
- return nil
- }
- }
-
- cmd.Env = server.setEnv(env)
-
- return cmd
- }, nil
-}
-
-// NewWorker issues new standalone worker.
-func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.EventListener) (worker.BaseProcess, error) {
- const op = errors.Op("new worker")
-
- list := make([]events.EventListener, 0, len(listeners))
- list = append(list, server.collectWorkerLogs)
-
- spawnCmd, err := server.CmdFactory(env)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd(), list...)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return w, nil
-}
-
-// NewWorkerPool issues new worker pool.
-func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.EventListener) (pool.Pool, error) {
- const op = errors.Op("server plugins new worker pool")
- spawnCmd, err := server.CmdFactory(env)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- list := make([]events.EventListener, 0, len(listeners))
- list = append(list, server.collectPoolLogs)
-
- p, err := poolImpl.Initialize(ctx, spawnCmd, server.factory, opt, poolImpl.AddListeners(list...))
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return p, nil
-}
-
-// creates relay and worker factory.
-func (server *Plugin) initFactory() (worker.Factory, error) {
- const op = errors.Op("network factory init")
- if server.cfg.Relay == "" || server.cfg.Relay == "pipes" {
- return pipe.NewPipeFactory(), nil
- }
-
- dsn := strings.Split(server.cfg.Relay, "://")
- if len(dsn) != 2 {
- return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
- }
-
- lsn, err := util.CreateListener(server.cfg.Relay)
- if err != nil {
- return nil, errors.E(op, errors.Network, err)
- }
-
- switch dsn[0] {
- // sockets group
- case "unix":
- return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
- case "tcp":
- return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
- default:
- return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
- }
-}
-
-func (server *Plugin) setEnv(e Env) []string {
- env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", server.cfg.Relay))
- for k, v := range e {
- env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
- }
-
- return env
-}
-
-func (server *Plugin) collectPoolLogs(event interface{}) {
- if we, ok := event.(events.PoolEvent); ok {
- switch we.Event {
- case events.EventMaxMemory:
- server.log.Info("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventNoFreeWorkers:
- server.log.Info("no free workers in pool", "error", we.Payload.(error).Error())
- case events.EventPoolError:
- server.log.Info("pool error", "error", we.Payload.(error).Error())
- case events.EventSupervisorError:
- server.log.Info("pool supervizor error", "error", we.Payload.(error).Error())
- case events.EventTTL:
- server.log.Info("worker TTL reached", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventWorkerConstruct:
- if _, ok := we.Payload.(error); ok {
- server.log.Error("worker construction error", "error", we.Payload.(error).Error())
- return
- }
- server.log.Info("worker constructed", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventWorkerDestruct:
- server.log.Info("worker destructed", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventExecTTL:
- server.log.Info("EVENT EXEC TTL PLACEHOLDER")
- case events.EventIdleTTL:
- server.log.Info("worker IDLE timeout reached", "pid", we.Payload.(worker.BaseProcess).Pid())
- }
- }
-
- if we, ok := event.(events.WorkerEvent); ok {
- switch we.Event {
- case events.EventWorkerError:
- server.log.Info(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid())
- case events.EventWorkerLog:
- server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid())
- }
- }
-}
-
-func (server *Plugin) collectWorkerLogs(event interface{}) {
- if we, ok := event.(events.WorkerEvent); ok {
- switch we.Event {
- case events.EventWorkerError:
- server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid())
- case events.EventWorkerLog:
- server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid())
- }
- }
-}