diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/http/attributes/attributes.go | 85 | ||||
-rw-r--r-- | plugins/http/config.go | 291 | ||||
-rw-r--r-- | plugins/http/constants.go | 6 | ||||
-rw-r--r-- | plugins/http/errors.go | 25 | ||||
-rw-r--r-- | plugins/http/errors_windows.go | 27 | ||||
-rw-r--r-- | plugins/http/handler.go | 240 | ||||
-rw-r--r-- | plugins/http/parse.go | 147 | ||||
-rw-r--r-- | plugins/http/plugin.go | 528 | ||||
-rw-r--r-- | plugins/http/request.go | 186 | ||||
-rw-r--r-- | plugins/http/response.go | 105 | ||||
-rw-r--r-- | plugins/http/uploads.go | 158 | ||||
-rw-r--r-- | plugins/http/uploads_config.go | 46 | ||||
-rw-r--r-- | plugins/informer/interface.go | 8 | ||||
-rw-r--r-- | plugins/informer/plugin.go | 55 | ||||
-rw-r--r-- | plugins/informer/rpc.go | 54 | ||||
-rw-r--r-- | plugins/server/config.go | 39 | ||||
-rw-r--r-- | plugins/server/interface.go | 20 | ||||
-rw-r--r-- | plugins/server/plugin.go | 229 |
18 files changed, 0 insertions, 2249 deletions
diff --git a/plugins/http/attributes/attributes.go b/plugins/http/attributes/attributes.go deleted file mode 100644 index 4c453766..00000000 --- a/plugins/http/attributes/attributes.go +++ /dev/null @@ -1,85 +0,0 @@ -package attributes - -import ( - "context" - "errors" - "net/http" -) - -// contextKey is a value for use with context.WithValue. It's used as -// a pointer so it fits in an interface{} without allocation. -type contextKey struct { - name string -} - -func (k *contextKey) String() string { return k.name } - -var ( - // PsrContextKey is a context key. It can be used in the http attributes - PsrContextKey = &contextKey{"psr_attributes"} -) - -type attrs map[string]interface{} - -func (v attrs) get(key string) interface{} { - if v == nil { - return "" - } - - return v[key] -} - -func (v attrs) set(key string, value interface{}) { - v[key] = value -} - -func (v attrs) del(key string) { - delete(v, key) -} - -// Init returns request with new context and attribute bag. -func Init(r *http.Request) *http.Request { - return r.WithContext(context.WithValue(r.Context(), PsrContextKey, attrs{})) -} - -// All returns all context attributes. -func All(r *http.Request) map[string]interface{} { - v := r.Context().Value(PsrContextKey) - if v == nil { - return attrs{} - } - - return v.(attrs) -} - -// Get gets the value from request context. It replaces any existing -// values. -func Get(r *http.Request, key string) interface{} { - v := r.Context().Value(PsrContextKey) - if v == nil { - return nil - } - - return v.(attrs).get(key) -} - -// Set sets the key to value. It replaces any existing -// values. Context specific. -func Set(r *http.Request, key string, value interface{}) error { - v := r.Context().Value(PsrContextKey) - if v == nil { - return errors.New("unable to find `psr:attributes` context key") - } - - v.(attrs).set(key, value) - return nil -} - -// Delete deletes values associated with attribute key. -func (v attrs) Delete(key string) { - if v == nil { - return - } - - v.del(key) -} diff --git a/plugins/http/config.go b/plugins/http/config.go deleted file mode 100644 index 00d2940b..00000000 --- a/plugins/http/config.go +++ /dev/null @@ -1,291 +0,0 @@ -package http - -import ( - "net" - "os" - "runtime" - "strings" - "time" - - "github.com/spiral/errors" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" -) - -type Cidrs []*net.IPNet - -func (c *Cidrs) IsTrusted(ip string) bool { - if len(*c) == 0 { - return false - } - - i := net.ParseIP(ip) - if i == nil { - return false - } - - for _, cird := range *c { - if cird.Contains(i) { - return true - } - } - - return false -} - -// Config configures RoadRunner HTTP server. -type Config struct { - // Port and port to handle as http server. - Address string - - // SSL defines https server options. - SSL *SSLConfig - - // FCGI configuration. You can use FastCGI without HTTP server. - FCGI *FCGIConfig - - // HTTP2 configuration - HTTP2 *HTTP2Config - - // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited. - MaxRequestSize uint64 - - // TrustedSubnets declare IP subnets which are allowed to set ip using X-Real-Ip and X-Forwarded-For - TrustedSubnets []string - - // Uploads configures uploads configuration. - Uploads *UploadsConfig - - // Pool configures worker pool. - Pool *poolImpl.Config - - // Env is environment variables passed to the http pool - Env map[string]string - - // List of the middleware names (order will be preserved) - Middleware []string - - // slice of net.IPNet - cidrs Cidrs -} - -// FCGIConfig for FastCGI server. -type FCGIConfig struct { - // Address and port to handle as http server. - Address string -} - -// HTTP2Config HTTP/2 server customizations. -type HTTP2Config struct { - // Enable or disable HTTP/2 extension, default enable. - Enabled bool - - // H2C enables HTTP/2 over TCP - H2C bool - - // MaxConcurrentStreams defaults to 128. - MaxConcurrentStreams uint32 -} - -// InitDefaults sets default values for HTTP/2 configuration. -func (cfg *HTTP2Config) InitDefaults() error { - cfg.Enabled = true - cfg.MaxConcurrentStreams = 128 - - return nil -} - -// SSLConfig defines https server configuration. -type SSLConfig struct { - // Port to listen as HTTPS server, defaults to 443. - Port int - - // Redirect when enabled forces all http connections to switch to https. - Redirect bool - - // Key defined private server key. - Key string - - // Cert is https certificate. - Cert string - - // Root CA file - RootCA string -} - -// EnableHTTP is true when http server must run. -func (c *Config) EnableHTTP() bool { - return c.Address != "" -} - -// EnableTLS returns true if pool must listen TLS connections. -func (c *Config) EnableTLS() bool { - return c.SSL.Key != "" || c.SSL.Cert != "" || c.SSL.RootCA != "" -} - -// EnableHTTP2 when HTTP/2 extension must be enabled (only with TSL). -func (c *Config) EnableHTTP2() bool { - return c.HTTP2.Enabled -} - -// EnableH2C when HTTP/2 extension must be enabled on TCP. -func (c *Config) EnableH2C() bool { - return c.HTTP2.H2C -} - -// EnableFCGI is true when FastCGI server must be enabled. -func (c *Config) EnableFCGI() bool { - return c.FCGI.Address != "" -} - -// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. -func (c *Config) InitDefaults() error { - if c.Pool == nil { - // default pool - c.Pool = &poolImpl.Config{ - Debug: false, - NumWorkers: int64(runtime.NumCPU()), - MaxJobs: 1000, - AllocateTimeout: time.Second * 60, - DestroyTimeout: time.Second * 60, - Supervisor: nil, - } - } - - if c.HTTP2 == nil { - c.HTTP2 = &HTTP2Config{} - } - - if c.FCGI == nil { - c.FCGI = &FCGIConfig{} - } - - if c.Uploads == nil { - c.Uploads = &UploadsConfig{} - } - - if c.SSL == nil { - c.SSL = &SSLConfig{} - } - - if c.SSL.Port == 0 { - c.SSL.Port = 443 - } - - err := c.HTTP2.InitDefaults() - if err != nil { - return err - } - err = c.Uploads.InitDefaults() - if err != nil { - return err - } - - if c.TrustedSubnets == nil { - // @see https://en.wikipedia.org/wiki/Reserved_IP_addresses - c.TrustedSubnets = []string{ - "10.0.0.0/8", - "127.0.0.0/8", - "172.16.0.0/12", - "192.168.0.0/16", - "::1/128", - "fc00::/7", - "fe80::/10", - } - } - - cidrs, err := ParseCIDRs(c.TrustedSubnets) - if err != nil { - return err - } - c.cidrs = cidrs - - return c.Valid() -} - -func ParseCIDRs(subnets []string) (Cidrs, error) { - c := make(Cidrs, 0, len(subnets)) - for _, cidr := range subnets { - _, cr, err := net.ParseCIDR(cidr) - if err != nil { - return nil, err - } - - c = append(c, cr) - } - - return c, nil -} - -// IsTrusted if api can be trusted to use X-Real-Ip, X-Forwarded-For -func (c *Config) IsTrusted(ip string) bool { - if c.cidrs == nil { - return false - } - - i := net.ParseIP(ip) - if i == nil { - return false - } - - for _, cird := range c.cidrs { - if cird.Contains(i) { - return true - } - } - - return false -} - -// Valid validates the configuration. -func (c *Config) Valid() error { - const op = errors.Op("validation") - if c.Uploads == nil { - return errors.E(op, errors.Str("malformed uploads config")) - } - - if c.HTTP2 == nil { - return errors.E(op, errors.Str("malformed http2 config")) - } - - if c.Pool == nil { - return errors.E(op, "malformed pool config") - } - - if !c.EnableHTTP() && !c.EnableTLS() && !c.EnableFCGI() { - return errors.E(op, errors.Str("unable to run http service, no method has been specified (http, https, http/2 or FastCGI)")) - } - - if c.Address != "" && !strings.Contains(c.Address, ":") { - return errors.E(op, errors.Str("malformed http server address")) - } - - if c.EnableTLS() { - if _, err := os.Stat(c.SSL.Key); err != nil { - if os.IsNotExist(err) { - return errors.E(op, errors.Errorf("key file '%s' does not exists", c.SSL.Key)) - } - - return err - } - - if _, err := os.Stat(c.SSL.Cert); err != nil { - if os.IsNotExist(err) { - return errors.E(op, errors.Errorf("cert file '%s' does not exists", c.SSL.Cert)) - } - - return err - } - - // RootCA is optional, but if provided - check it - if c.SSL.RootCA != "" { - if _, err := os.Stat(c.SSL.RootCA); err != nil { - if os.IsNotExist(err) { - return errors.E(op, errors.Errorf("root ca path provided, but path '%s' does not exists", c.SSL.RootCA)) - } - return err - } - } - } - - return nil -} diff --git a/plugins/http/constants.go b/plugins/http/constants.go deleted file mode 100644 index 773d1f46..00000000 --- a/plugins/http/constants.go +++ /dev/null @@ -1,6 +0,0 @@ -package http - -import "net/http" - -var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push") -var TrailerHeaderKey = http.CanonicalHeaderKey("trailer") diff --git a/plugins/http/errors.go b/plugins/http/errors.go deleted file mode 100644 index fb8762ef..00000000 --- a/plugins/http/errors.go +++ /dev/null @@ -1,25 +0,0 @@ -// +build !windows - -package http - -import ( - "errors" - "net" - "os" - "syscall" -) - -// Broken pipe -var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer") - -// handleWriteError just check if error was caused by aborted connection on linux -func handleWriteError(err error) error { - if netErr, ok2 := err.(*net.OpError); ok2 { - if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { - if syscallErr.Err == syscall.EPIPE { - return errEPIPE - } - } - } - return err -} diff --git a/plugins/http/errors_windows.go b/plugins/http/errors_windows.go deleted file mode 100644 index 3d0ba04c..00000000 --- a/plugins/http/errors_windows.go +++ /dev/null @@ -1,27 +0,0 @@ -// +build windows - -package http - -import ( - "errors" - "net" - "os" - "syscall" -) - -//Software caused connection abort. -//An established connection was aborted by the software in your host computer, -//possibly due to a data transmission time-out or protocol error. -var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer") - -// handleWriteError just check if error was caused by aborted connection on windows -func handleWriteError(err error) error { - if netErr, ok2 := err.(*net.OpError); ok2 { - if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { - if syscallErr.Err == syscall.WSAECONNABORTED { - return errEPIPE - } - } - } - return err -} diff --git a/plugins/http/handler.go b/plugins/http/handler.go deleted file mode 100644 index 08cee661..00000000 --- a/plugins/http/handler.go +++ /dev/null @@ -1,240 +0,0 @@ -package http - -import ( - "net" - "net/http" - "strconv" - "strings" - "sync" - "time" - - "github.com/hashicorp/go-multierror" - "github.com/spiral/errors" - "github.com/spiral/roadrunner-plugins/logger" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" -) - -const ( - // EventResponse thrown after the request been processed. See ErrorEvent as payload. - EventResponse = iota + 500 - - // EventError thrown on any non job error provided by road runner server. - EventError -) - -const MB = 1024 * 1024 - -type Handle interface { - AddListener(l events.EventListener) - ServeHTTP(w http.ResponseWriter, r *http.Request) -} - -// ErrorEvent represents singular http error event. -type ErrorEvent struct { - // Request contains client request, must not be stored. - Request *http.Request - - // Error - associated error, if any. - Error error - - // event timings - start time.Time - elapsed time.Duration -} - -// Elapsed returns duration of the invocation. -func (e *ErrorEvent) Elapsed() time.Duration { - return e.elapsed -} - -// ResponseEvent represents singular http response event. -type ResponseEvent struct { - // Request contains client request, must not be stored. - Request *Request - - // Response contains service response. - Response *Response - - // event timings - start time.Time - elapsed time.Duration -} - -// Elapsed returns duration of the invocation. -func (e *ResponseEvent) Elapsed() time.Duration { - return e.elapsed -} - -// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, -// parsed files and query, payload will include parsed form dataTree (if any). -type handler struct { - maxRequestSize uint64 - uploads UploadsConfig - trusted Cidrs - log logger.Logger - pool pool.Pool - mul sync.Mutex - lsn events.EventListener -} - -func NewHandler(maxReqSize uint64, uploads UploadsConfig, trusted Cidrs, pool pool.Pool) (Handle, error) { - if pool == nil { - return nil, errors.E(errors.Str("pool should be initialized")) - } - return &handler{ - maxRequestSize: maxReqSize * MB, - uploads: uploads, - pool: pool, - trusted: trusted, - }, nil -} - -// Listen attaches handler event controller. -func (h *handler) AddListener(l events.EventListener) { - h.mul.Lock() - defer h.mul.Unlock() - - h.lsn = l -} - -// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. -func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - const op = errors.Op("ServeHTTP") - start := time.Now() - - // validating request size - if h.maxRequestSize != 0 { - err := h.maxSize(w, r, start, op) - if err != nil { - return - } - } - - req, err := NewRequest(r, h.uploads) - if err != nil { - h.handleError(w, r, err, start) - return - } - - // proxy IP resolution - h.resolveIP(req) - - req.Open(h.log) - defer req.Close(h.log) - - p, err := req.Payload() - if err != nil { - h.handleError(w, r, err, start) - return - } - - rsp, err := h.pool.Exec(p) - if err != nil { - h.handleError(w, r, err, start) - return - } - - resp, err := NewResponse(rsp) - if err != nil { - h.handleError(w, r, err, start) - return - } - - h.handleResponse(req, resp, start) - err = resp.Write(w) - if err != nil { - h.handleError(w, r, err, start) - } -} - -func (h *handler) maxSize(w http.ResponseWriter, r *http.Request, start time.Time, op errors.Op) error { - if length := r.Header.Get("content-length"); length != "" { - if size, err := strconv.ParseInt(length, 10, 64); err != nil { - h.handleError(w, r, err, start) - return err - } else if size > int64(h.maxRequestSize) { - h.handleError(w, r, errors.E(op, errors.Str("request body max size is exceeded")), start) - return err - } - } - return nil -} - -// handleError sends error. -func (h *handler) handleError(w http.ResponseWriter, r *http.Request, err error, start time.Time) { - h.mul.Lock() - defer h.mul.Unlock() - // if pipe is broken, there is no sense to write the header - // in this case we just report about error - if err == errEPIPE { - h.throw(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) - return - } - err = multierror.Append(err) - // ResponseWriter is ok, write the error code - w.WriteHeader(500) - _, err2 := w.Write([]byte(err.Error())) - // error during the writing to the ResponseWriter - if err2 != nil { - err = multierror.Append(err2, err) - // concat original error with ResponseWriter error - h.throw(ErrorEvent{Request: r, Error: errors.E(err), start: start, elapsed: time.Since(start)}) - return - } - h.throw(ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) -} - -// handleResponse triggers response event. -func (h *handler) handleResponse(req *Request, resp *Response, start time.Time) { - h.throw(ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)}) -} - -// throw invokes event handler if any. -func (h *handler) throw(event interface{}) { - if h.lsn != nil { - h.lsn(event) - } -} - -// get real ip passing multiple proxy -func (h *handler) resolveIP(r *Request) { - if h.trusted.IsTrusted(r.RemoteAddr) == false { - return - } - - if r.Header.Get("X-Forwarded-For") != "" { - ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",") - ipCount := len(ips) - - for i := ipCount - 1; i >= 0; i-- { - addr := strings.TrimSpace(ips[i]) - if net.ParseIP(addr) != nil { - r.RemoteAddr = addr - return - } - } - - return - } - - // The logic here is the following: - // In general case, we only expect X-Real-Ip header. If it exist, we get the IP address from header and set request Remote address - // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers - // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF. - // CF-Connecting-IP is an Enterprise feature and we check it last in order. - // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string - if r.Header.Get("X-Real-Ip") != "" { - r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip")) - return - } - - if r.Header.Get("True-Client-IP") != "" { - r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP")) - return - } - - if r.Header.Get("CF-Connecting-IP") != "" { - r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP")) - } -} diff --git a/plugins/http/parse.go b/plugins/http/parse.go deleted file mode 100644 index d4a1604b..00000000 --- a/plugins/http/parse.go +++ /dev/null @@ -1,147 +0,0 @@ -package http - -import ( - "net/http" -) - -// MaxLevel defines maximum tree depth for incoming request data and files. -const MaxLevel = 127 - -type dataTree map[string]interface{} -type fileTree map[string]interface{} - -// parseData parses incoming request body into data tree. -func parseData(r *http.Request) dataTree { - data := make(dataTree) - if r.PostForm != nil { - for k, v := range r.PostForm { - data.push(k, v) - } - } - - if r.MultipartForm != nil { - for k, v := range r.MultipartForm.Value { - data.push(k, v) - } - } - - return data -} - -// pushes value into data tree. -func (d dataTree) push(k string, v []string) { - keys := FetchIndexes(k) - if len(keys) <= MaxLevel { - d.mount(keys, v) - } -} - -// mount mounts data tree recursively. -func (d dataTree) mount(i []string, v []string) { - if len(i) == 1 { - // single value context (last element) - d[i[0]] = v[len(v)-1] - return - } - - if len(i) == 2 && i[1] == "" { - // non associated array of elements - d[i[0]] = v - return - } - - if p, ok := d[i[0]]; ok { - p.(dataTree).mount(i[1:], v) - return - } - - d[i[0]] = make(dataTree) - d[i[0]].(dataTree).mount(i[1:], v) -} - -// parse incoming dataTree request into JSON (including contentMultipart form dataTree) -func parseUploads(r *http.Request, cfg UploadsConfig) *Uploads { - u := &Uploads{ - cfg: cfg, - tree: make(fileTree), - list: make([]*FileUpload, 0), - } - - for k, v := range r.MultipartForm.File { - files := make([]*FileUpload, 0, len(v)) - for _, f := range v { - files = append(files, NewUpload(f)) - } - - u.list = append(u.list, files...) - u.tree.push(k, files) - } - - return u -} - -// pushes new file upload into it's proper place. -func (d fileTree) push(k string, v []*FileUpload) { - keys := FetchIndexes(k) - if len(keys) <= MaxLevel { - d.mount(keys, v) - } -} - -// mount mounts data tree recursively. -func (d fileTree) mount(i []string, v []*FileUpload) { - if len(i) == 1 { - // single value context - d[i[0]] = v[0] - return - } - - if len(i) == 2 && i[1] == "" { - // non associated array of elements - d[i[0]] = v - return - } - - if p, ok := d[i[0]]; ok { - p.(fileTree).mount(i[1:], v) - return - } - - d[i[0]] = make(fileTree) - d[i[0]].(fileTree).mount(i[1:], v) -} - -// FetchIndexes parses input name and splits it into separate indexes list. -func FetchIndexes(s string) []string { - var ( - pos int - ch string - keys = make([]string, 1) - ) - - for _, c := range s { - ch = string(c) - switch ch { - case " ": - // ignore all spaces - continue - case "[": - pos = 1 - continue - case "]": - if pos == 1 { - keys = append(keys, "") - } - pos = 2 - default: - if pos == 1 || pos == 2 { - keys = append(keys, "") - } - - keys[len(keys)-1] += ch - pos = 0 - } - } - - return keys -} diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go deleted file mode 100644 index 301468d1..00000000 --- a/plugins/http/plugin.go +++ /dev/null @@ -1,528 +0,0 @@ -package http - -import ( - "context" - "crypto/tls" - "crypto/x509" - "fmt" - "io/ioutil" - "net/http" - "net/http/fcgi" - "net/url" - "strings" - "sync" - - "github.com/hashicorp/go-multierror" - "github.com/spiral/endure" - "github.com/spiral/errors" - "github.com/spiral/roadrunner-plugins/checker" - "github.com/spiral/roadrunner-plugins/config" - "github.com/spiral/roadrunner-plugins/logger" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/http/attributes" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/util" - "golang.org/x/net/http2" - "golang.org/x/net/http2/h2c" - "golang.org/x/sys/cpu" -) - -const ( - // PluginName declares plugin name. - PluginName = "http" - - // EventInitSSL thrown at moment of https initialization. SSL server passed as context. - EventInitSSL = 750 -) - -// Middleware interface -type Middleware interface { - Middleware(f http.Handler) http.HandlerFunc -} - -type middleware map[string]Middleware - -// Service manages pool, http servers. -type Plugin struct { - sync.RWMutex - - configurer config.Configurer - server server.Server - log logger.Logger - - cfg *Config - // middlewares to chain - mdwr middleware - - // Pool which attached to all servers - pool pool.Pool - - // servers RR handler - handler Handle - - // servers - http *http.Server - https *http.Server - fcgi *http.Server -} - -// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of -// misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { - const op = errors.Op("http Init") - err := cfg.UnmarshalKey(PluginName, &s.cfg) - if err != nil { - return errors.E(op, err) - } - - err = s.cfg.InitDefaults() - if err != nil { - return errors.E(op, err) - } - - s.configurer = cfg - s.log = log - s.mdwr = make(map[string]Middleware) - - if !s.cfg.EnableHTTP() && !s.cfg.EnableTLS() && !s.cfg.EnableFCGI() { - return errors.E(op, errors.Disabled) - } - - s.pool, err = server.NewWorkerPool(context.Background(), poolImpl.Config{ - Debug: s.cfg.Pool.Debug, - NumWorkers: s.cfg.Pool.NumWorkers, - MaxJobs: s.cfg.Pool.MaxJobs, - AllocateTimeout: s.cfg.Pool.AllocateTimeout, - DestroyTimeout: s.cfg.Pool.DestroyTimeout, - Supervisor: s.cfg.Pool.Supervisor, - }, s.cfg.Env, s.logCallback) - if err != nil { - return errors.E(op, err) - } - - s.server = server - - return nil -} - -func (s *Plugin) logCallback(event interface{}) { - if ev, ok := event.(ResponseEvent); ok { - s.log.Debug("http handler response received", "elapsed", ev.Elapsed().String(), "remote address", ev.Request.RemoteAddr) - } -} - -// Serve serves the svc. -func (s *Plugin) Serve() chan error { - s.Lock() - defer s.Unlock() - - const op = errors.Op("serve http") - errCh := make(chan error, 2) - - var err error - s.handler, err = NewHandler( - s.cfg.MaxRequestSize, - *s.cfg.Uploads, - s.cfg.cidrs, - s.pool, - ) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - s.handler.AddListener(s.logCallback) - - if s.cfg.EnableHTTP() { - if s.cfg.EnableH2C() { - s.http = &http.Server{Addr: s.cfg.Address, Handler: h2c.NewHandler(s, &http2.Server{})} - } else { - s.http = &http.Server{Addr: s.cfg.Address, Handler: s} - } - } - - if s.cfg.EnableTLS() { - s.https = s.initSSL() - if s.cfg.SSL.RootCA != "" { - err = s.appendRootCa() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - } - - if s.cfg.EnableHTTP2() { - if err := s.initHTTP2(); err != nil { - errCh <- errors.E(op, err) - return errCh - } - } - } - - if s.cfg.EnableFCGI() { - s.fcgi = &http.Server{Handler: s} - } - - // apply middlewares before starting the server - if len(s.mdwr) > 0 { - s.addMiddlewares() - } - - if s.http != nil { - go func() { - httpErr := s.http.ListenAndServe() - if httpErr != nil && httpErr != http.ErrServerClosed { - errCh <- errors.E(op, httpErr) - return - } - }() - } - - if s.https != nil { - go func() { - httpErr := s.https.ListenAndServeTLS( - s.cfg.SSL.Cert, - s.cfg.SSL.Key, - ) - - if httpErr != nil && httpErr != http.ErrServerClosed { - errCh <- errors.E(op, httpErr) - return - } - }() - } - - if s.fcgi != nil { - go func() { - httpErr := s.serveFCGI() - if httpErr != nil && httpErr != http.ErrServerClosed { - errCh <- errors.E(op, httpErr) - return - } - }() - } - - return errCh -} - -// Stop stops the http. -func (s *Plugin) Stop() error { - s.Lock() - defer s.Unlock() - - var err error - if s.fcgi != nil { - err = s.fcgi.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the fcgi server", "error", err) - // write error and try to stop other transport - err = multierror.Append(err) - } - } - - if s.https != nil { - err = s.https.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the https server", "error", err) - // write error and try to stop other transport - err = multierror.Append(err) - } - } - - if s.http != nil { - err = s.http.Shutdown(context.Background()) - if err != nil && err != http.ErrServerClosed { - s.log.Error("error shutting down the http server", "error", err) - // write error and try to stop other transport - err = multierror.Append(err) - } - } - - s.pool.Destroy(context.Background()) - - return err -} - -// ServeHTTP handles connection using set of middleware and pool PSR-7 server. -func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if headerContainsUpgrade(r, s) { - http.Error(w, "server does not support upgrade header", http.StatusInternalServerError) - return - } - - if s.redirect(w, r) { - return - } - - if s.https != nil && r.TLS != nil { - w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload") - } - - r = attributes.Init(r) - // protect the case, when user send Reset and we are replacing handler with pool - s.RLock() - s.handler.ServeHTTP(w, r) - s.RUnlock() -} - -// Server returns associated pool workers -func (s *Plugin) Workers() []worker.BaseProcess { - return s.pool.Workers() -} - -func (s *Plugin) Name() string { - return PluginName -} - -func (s *Plugin) Reset() error { - s.Lock() - defer s.Unlock() - const op = errors.Op("http reset") - s.log.Info("HTTP plugin got restart request. Restarting...") - s.pool.Destroy(context.Background()) - s.pool = nil - - // re-read the config - err := s.configurer.UnmarshalKey(PluginName, &s.cfg) - if err != nil { - return errors.E(op, err) - } - - s.pool, err = s.server.NewWorkerPool(context.Background(), poolImpl.Config{ - Debug: s.cfg.Pool.Debug, - NumWorkers: s.cfg.Pool.NumWorkers, - MaxJobs: s.cfg.Pool.MaxJobs, - AllocateTimeout: s.cfg.Pool.AllocateTimeout, - DestroyTimeout: s.cfg.Pool.DestroyTimeout, - Supervisor: s.cfg.Pool.Supervisor, - }, s.cfg.Env, s.logCallback) - if err != nil { - return errors.E(op, err) - } - - s.log.Info("HTTP listeners successfully re-added") - - s.log.Info("HTTP workers Pool successfully restarted") - s.handler, err = NewHandler( - s.cfg.MaxRequestSize, - *s.cfg.Uploads, - s.cfg.cidrs, - s.pool, - ) - if err != nil { - return errors.E(op, err) - } - - s.log.Info("HTTP plugin successfully restarted") - return nil -} - -func (s *Plugin) Collects() []interface{} { - return []interface{}{ - s.AddMiddleware, - } -} - -func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) { - s.mdwr[name.Name()] = m -} - -// Status return status of the particular plugin -func (s *Plugin) Status() checker.Status { - workers := s.Workers() - for i := 0; i < len(workers); i++ { - if workers[i].State().IsActive() { - return checker.Status{ - Code: http.StatusOK, - } - } - } - // if there are no workers, threat this as error - return checker.Status{ - Code: http.StatusInternalServerError, - } -} - -func (s *Plugin) redirect(w http.ResponseWriter, r *http.Request) bool { - if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect { - target := &url.URL{ - Scheme: "https", - Host: s.tlsAddr(r.Host, false), - Path: r.URL.Path, - RawQuery: r.URL.RawQuery, - } - - http.Redirect(w, r, target.String(), http.StatusTemporaryRedirect) - return true - } - return false -} - -func headerContainsUpgrade(r *http.Request, s *Plugin) bool { - if _, ok := r.Header["Upgrade"]; ok { - // https://golang.org/pkg/net/http/#Hijacker - s.log.Error("server does not support Upgrade header") - return true - } - return false -} - -// append RootCA to the https server TLS config -func (s *Plugin) appendRootCa() error { - const op = errors.Op("append root CA") - rootCAs, err := x509.SystemCertPool() - if err != nil { - return nil - } - if rootCAs == nil { - rootCAs = x509.NewCertPool() - } - - CA, err := ioutil.ReadFile(s.cfg.SSL.RootCA) - if err != nil { - return err - } - - // should append our CA cert - ok := rootCAs.AppendCertsFromPEM(CA) - if !ok { - return errors.E(op, errors.Str("could not append Certs from PEM")) - } - // disable "G402 (CWE-295): TLS MinVersion too low. (Confidence: HIGH, Severity: HIGH)" - // #nosec G402 - cfg := &tls.Config{ - InsecureSkipVerify: false, - RootCAs: rootCAs, - } - s.http.TLSConfig = cfg - - return nil -} - -// Init https server -func (s *Plugin) initSSL() *http.Server { - var topCipherSuites []uint16 - var defaultCipherSuitesTLS13 []uint16 - - hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ - hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL - // Keep in sync with crypto/aes/cipher_s390x.go. - hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM) - - hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X - - if hasGCMAsm { - // If AES-GCM hardware is provided then prioritise AES-GCM - // cipher suites. - topCipherSuites = []uint16{ - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, - } - defaultCipherSuitesTLS13 = []uint16{ - tls.TLS_AES_128_GCM_SHA256, - tls.TLS_CHACHA20_POLY1305_SHA256, - tls.TLS_AES_256_GCM_SHA384, - } - } else { - // Without AES-GCM hardware, we put the ChaCha20-Poly1305 - // cipher suites first. - topCipherSuites = []uint16{ - tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - } - defaultCipherSuitesTLS13 = []uint16{ - tls.TLS_CHACHA20_POLY1305_SHA256, - tls.TLS_AES_128_GCM_SHA256, - tls.TLS_AES_256_GCM_SHA384, - } - } - - DefaultCipherSuites := make([]uint16, 0, 22) - DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...) - DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) - - server := &http.Server{ - Addr: s.tlsAddr(s.cfg.Address, true), - Handler: s, - TLSConfig: &tls.Config{ - CurvePreferences: []tls.CurveID{ - tls.CurveP256, - tls.CurveP384, - tls.CurveP521, - tls.X25519, - }, - CipherSuites: DefaultCipherSuites, - MinVersion: tls.VersionTLS12, - PreferServerCipherSuites: true, - }, - } - - return server -} - -// init http/2 server -func (s *Plugin) initHTTP2() error { - return http2.ConfigureServer(s.https, &http2.Server{ - MaxConcurrentStreams: s.cfg.HTTP2.MaxConcurrentStreams, - }) -} - -// serveFCGI starts FastCGI server. -func (s *Plugin) serveFCGI() error { - l, err := util.CreateListener(s.cfg.FCGI.Address) - if err != nil { - return err - } - - err = fcgi.Serve(l, s.fcgi.Handler) - if err != nil { - return err - } - - return nil -} - -// tlsAddr replaces listen or host port with port configured by SSL config. -func (s *Plugin) tlsAddr(host string, forcePort bool) string { - // remove current forcePort first - host = strings.Split(host, ":")[0] - - if forcePort || s.cfg.SSL.Port != 443 { - host = fmt.Sprintf("%s:%v", host, s.cfg.SSL.Port) - } - - return host -} - -func (s *Plugin) addMiddlewares() { - if s.http != nil { - applyMiddlewares(s.http, s.mdwr, s.cfg.Middleware, s.log) - } - if s.https != nil { - applyMiddlewares(s.https, s.mdwr, s.cfg.Middleware, s.log) - } - - if s.fcgi != nil { - applyMiddlewares(s.fcgi, s.mdwr, s.cfg.Middleware, s.log) - } -} - -func applyMiddlewares(server *http.Server, middlewares map[string]Middleware, order []string, log logger.Logger) { - for i := 0; i < len(order); i++ { - if mdwr, ok := middlewares[order[i]]; ok { - server.Handler = mdwr.Middleware(server.Handler) - } else { - log.Warn("requested middleware does not exist", "requested", order[i]) - } - } -} diff --git a/plugins/http/request.go b/plugins/http/request.go deleted file mode 100644 index 16243bcc..00000000 --- a/plugins/http/request.go +++ /dev/null @@ -1,186 +0,0 @@ -package http - -import ( - "fmt" - "io/ioutil" - "net" - "net/http" - "net/url" - "strings" - - j "github.com/json-iterator/go" - "github.com/spiral/roadrunner-plugins/logger" - "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/plugins/http/attributes" -) - -var json = j.ConfigCompatibleWithStandardLibrary - -const ( - defaultMaxMemory = 32 << 20 // 32 MB - contentNone = iota + 900 - contentStream - contentMultipart - contentFormData -) - -// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. -type Request struct { - // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address. - RemoteAddr string `json:"remoteAddr"` - - // Protocol includes HTTP protocol version. - Protocol string `json:"protocol"` - - // Method contains name of HTTP method used for the request. - Method string `json:"method"` - - // URI contains full request URI with scheme and query. - URI string `json:"uri"` - - // Header contains list of request headers. - Header http.Header `json:"headers"` - - // Cookies contains list of request cookies. - Cookies map[string]string `json:"cookies"` - - // RawQuery contains non parsed query string (to be parsed on php end). - RawQuery string `json:"rawQuery"` - - // Parsed indicates that request body has been parsed on RR end. - Parsed bool `json:"parsed"` - - // Uploads contains list of uploaded files, their names, sized and associations with temporary files. - Uploads *Uploads `json:"uploads"` - - // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions. - Attributes map[string]interface{} `json:"attributes"` - - // request body can be parsedData or []byte - body interface{} -} - -func fetchIP(pair string) string { - if !strings.ContainsRune(pair, ':') { - return pair - } - - addr, _, _ := net.SplitHostPort(pair) - return addr -} - -// NewRequest creates new PSR7 compatible request using net/http request. -func NewRequest(r *http.Request, cfg UploadsConfig) (*Request, error) { - req := &Request{ - RemoteAddr: fetchIP(r.RemoteAddr), - Protocol: r.Proto, - Method: r.Method, - URI: uri(r), - Header: r.Header, - Cookies: make(map[string]string), - RawQuery: r.URL.RawQuery, - Attributes: attributes.All(r), - } - - for _, c := range r.Cookies() { - if v, err := url.QueryUnescape(c.Value); err == nil { - req.Cookies[c.Name] = v - } - } - - switch req.contentType() { - case contentNone: - return req, nil - - case contentStream: - var err error - req.body, err = ioutil.ReadAll(r.Body) - return req, err - - case contentMultipart: - if err := r.ParseMultipartForm(defaultMaxMemory); err != nil { - return nil, err - } - - req.Uploads = parseUploads(r, cfg) - fallthrough - case contentFormData: - if err := r.ParseForm(); err != nil { - return nil, err - } - - req.body = parseData(r) - } - - req.Parsed = true - return req, nil -} - -// Open moves all uploaded files to temporary directory so it can be given to php later. -func (r *Request) Open(log logger.Logger) { - if r.Uploads == nil { - return - } - - r.Uploads.Open(log) -} - -// Close clears all temp file uploads -func (r *Request) Close(log logger.Logger) { - if r.Uploads == nil { - return - } - - r.Uploads.Clear(log) -} - -// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open -// files prior to calling this method. -func (r *Request) Payload() (payload.Payload, error) { - p := payload.Payload{} - - var err error - if p.Context, err = json.Marshal(r); err != nil { - return payload.Payload{}, err - } - - if r.Parsed { - if p.Body, err = json.Marshal(r.body); err != nil { - return payload.Payload{}, err - } - } else if r.body != nil { - p.Body = r.body.([]byte) - } - - return p, nil -} - -// contentType returns the payload content type. -func (r *Request) contentType() int { - if r.Method == "HEAD" || r.Method == "OPTIONS" { - return contentNone - } - - ct := r.Header.Get("content-type") - if strings.Contains(ct, "application/x-www-form-urlencoded") { - return contentFormData - } - - if strings.Contains(ct, "multipart/form-data") { - return contentMultipart - } - - return contentStream -} - -// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). -func uri(r *http.Request) string { - if r.URL.Host != "" { - return r.URL.String() - } - if r.TLS != nil { - return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) - } - - return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) -} diff --git a/plugins/http/response.go b/plugins/http/response.go deleted file mode 100644 index 17049ce1..00000000 --- a/plugins/http/response.go +++ /dev/null @@ -1,105 +0,0 @@ -package http - -import ( - "io" - "net/http" - "strings" - "sync" - - "github.com/spiral/roadrunner/v2/pkg/payload" -) - -// Response handles PSR7 response logic. -type Response struct { - // Status contains response status. - Status int `json:"status"` - - // Header contains list of response headers. - Headers map[string][]string `json:"headers"` - - // associated Body payload. - Body interface{} - sync.Mutex -} - -// NewResponse creates new response based on given pool payload. -func NewResponse(p payload.Payload) (*Response, error) { - r := &Response{Body: p.Body} - if err := json.Unmarshal(p.Context, r); err != nil { - return nil, err - } - - return r, nil -} - -// Write writes response headers, status and body into ResponseWriter. -func (r *Response) Write(w http.ResponseWriter) error { - // INFO map is the reference type in golang - p := handlePushHeaders(r.Headers) - if pusher, ok := w.(http.Pusher); ok { - for _, v := range p { - err := pusher.Push(v, nil) - if err != nil { - return err - } - } - } - - handleTrailers(r.Headers) - for n, h := range r.Headers { - for _, v := range h { - w.Header().Add(n, v) - } - } - - w.WriteHeader(r.Status) - - if data, ok := r.Body.([]byte); ok { - _, err := w.Write(data) - if err != nil { - return handleWriteError(err) - } - } - - if rc, ok := r.Body.(io.Reader); ok { - if _, err := io.Copy(w, rc); err != nil { - return err - } - } - - return nil -} - -func handlePushHeaders(h map[string][]string) []string { - var p []string - pushHeader, ok := h[http2pushHeaderKey] - if !ok { - return p - } - - p = append(p, pushHeader...) - - delete(h, http2pushHeaderKey) - - return p -} - -func handleTrailers(h map[string][]string) { - trailers, ok := h[TrailerHeaderKey] - if !ok { - return - } - - for _, tr := range trailers { - for _, n := range strings.Split(tr, ",") { - n = strings.Trim(n, "\t ") - if v, ok := h[n]; ok { - h["Trailer:"+n] = v - - delete(h, n) - } - } - } - - delete(h, TrailerHeaderKey) -} diff --git a/plugins/http/uploads.go b/plugins/http/uploads.go deleted file mode 100644 index 1f14cc0d..00000000 --- a/plugins/http/uploads.go +++ /dev/null @@ -1,158 +0,0 @@ -package http - -import ( - "github.com/spiral/roadrunner-plugins/logger" - - "io" - "io/ioutil" - "mime/multipart" - "os" - "sync" -) - -const ( - // UploadErrorOK - no error, the file uploaded with success. - UploadErrorOK = 0 - - // UploadErrorNoFile - no file was uploaded. - UploadErrorNoFile = 4 - - // UploadErrorNoTmpDir - missing a temporary folder. - UploadErrorNoTmpDir = 6 - - // UploadErrorCantWrite - failed to write file to disk. - UploadErrorCantWrite = 7 - - // UploadErrorExtension - forbidden file extension. - UploadErrorExtension = 8 -) - -// Uploads tree manages uploaded files tree and temporary files. -type Uploads struct { - // associated temp directory and forbidden extensions. - cfg UploadsConfig - - // pre processed data tree for Uploads. - tree fileTree - - // flat list of all file Uploads. - list []*FileUpload -} - -// MarshalJSON marshal tree tree into JSON. -func (u *Uploads) MarshalJSON() ([]byte, error) { - return json.Marshal(u.tree) -} - -// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors -// will be handled individually. -func (u *Uploads) Open(log logger.Logger) { - var wg sync.WaitGroup - for _, f := range u.list { - wg.Add(1) - go func(f *FileUpload) { - defer wg.Done() - err := f.Open(u.cfg) - if err != nil && log != nil { - log.Error("error opening the file", "err", err) - } - }(f) - } - - wg.Wait() -} - -// Clear deletes all temporary files. -func (u *Uploads) Clear(log logger.Logger) { - for _, f := range u.list { - if f.TempFilename != "" && exists(f.TempFilename) { - err := os.Remove(f.TempFilename) - if err != nil && log != nil { - log.Error("error removing the file", "err", err) - } - } - } -} - -// FileUpload represents singular file NewUpload. -type FileUpload struct { - // ID contains filename specified by the client. - Name string `json:"name"` - - // Mime contains mime-type provided by the client. - Mime string `json:"mime"` - - // Size of the uploaded file. - Size int64 `json:"size"` - - // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php - Error int `json:"error"` - - // TempFilename points to temporary file location. - TempFilename string `json:"tmpName"` - - // associated file header - header *multipart.FileHeader -} - -// NewUpload wraps net/http upload into PRS-7 compatible structure. -func NewUpload(f *multipart.FileHeader) *FileUpload { - return &FileUpload{ - Name: f.Filename, - Mime: f.Header.Get("Content-Type"), - Error: UploadErrorOK, - header: f, - } -} - -// Open moves file content into temporary file available for PHP. -// NOTE: -// There is 2 deferred functions, and in case of getting 2 errors from both functions -// error from close of temp file would be overwritten by error from the main file -// STACK -// DEFER FILE CLOSE (2) -// DEFER TMP CLOSE (1) -func (f *FileUpload) Open(cfg UploadsConfig) (err error) { - if cfg.Forbids(f.Name) { - f.Error = UploadErrorExtension - return nil - } - - file, err := f.header.Open() - if err != nil { - f.Error = UploadErrorNoFile - return err - } - - defer func() { - // close the main file - err = file.Close() - }() - - tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload") - if err != nil { - // most likely cause of this issue is missing tmp dir - f.Error = UploadErrorNoTmpDir - return err - } - - f.TempFilename = tmp.Name() - defer func() { - // close the temp file - err = tmp.Close() - }() - - if f.Size, err = io.Copy(tmp, file); err != nil { - f.Error = UploadErrorCantWrite - } - - return err -} - -// exists if file exists. -func exists(path string) bool { - if _, err := os.Stat(path); os.IsNotExist(err) { - return false - } - return true -} diff --git a/plugins/http/uploads_config.go b/plugins/http/uploads_config.go deleted file mode 100644 index 4c20c8e8..00000000 --- a/plugins/http/uploads_config.go +++ /dev/null @@ -1,46 +0,0 @@ -package http - -import ( - "os" - "path" - "strings" -) - -// UploadsConfig describes file location and controls access to them. -type UploadsConfig struct { - // Dir contains name of directory to control access to. - Dir string - - // Forbid specifies list of file extensions which are forbidden for access. - // Example: .php, .exe, .bat, .htaccess and etc. - Forbid []string -} - -// InitDefaults sets missing values to their default values. -func (cfg *UploadsConfig) InitDefaults() error { - cfg.Forbid = []string{".php", ".exe", ".bat"} - cfg.Dir = os.TempDir() - return nil -} - -// TmpDir returns temporary directory. -func (cfg *UploadsConfig) TmpDir() string { - if cfg.Dir != "" { - return cfg.Dir - } - - return os.TempDir() -} - -// Forbids must return true if file extension is not allowed for the upload. -func (cfg *UploadsConfig) Forbids(filename string) bool { - ext := strings.ToLower(path.Ext(filename)) - - for _, v := range cfg.Forbid { - if ext == v { - return true - } - } - - return false -} diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go deleted file mode 100644 index 27139ae1..00000000 --- a/plugins/informer/interface.go +++ /dev/null @@ -1,8 +0,0 @@ -package informer - -import "github.com/spiral/roadrunner/v2/interfaces/worker" - -// Informer used to get workers from particular plugin or set of plugins -type Informer interface { - Workers() []worker.BaseProcess -} diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go deleted file mode 100644 index e2da7d86..00000000 --- a/plugins/informer/plugin.go +++ /dev/null @@ -1,55 +0,0 @@ -package informer - -import ( - "github.com/spiral/endure" - "github.com/spiral/errors" - "github.com/spiral/roadrunner-plugins/logger" - "github.com/spiral/roadrunner/v2/interfaces/worker" -) - -const PluginName = "informer" - -type Plugin struct { - registry map[string]Informer - log logger.Logger -} - -func (p *Plugin) Init(log logger.Logger) error { - p.registry = make(map[string]Informer) - p.log = log - return nil -} - -// Workers provides BaseProcess slice with workers for the requested plugin -func (p *Plugin) Workers(name string) ([]worker.BaseProcess, error) { - const op = errors.Op("get workers") - svc, ok := p.registry[name] - if !ok { - return nil, errors.E(op, errors.Errorf("no such service: %s", name)) - } - - return svc.Workers(), nil -} - -// CollectTarget resettable service. -func (p *Plugin) CollectTarget(name endure.Named, r Informer) error { - p.registry[name.Name()] = r - return nil -} - -// Collects declares services to be collected. -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.CollectTarget, - } -} - -// Name of the service. -func (p *Plugin) Name() string { - return PluginName -} - -// RPCService returns associated rpc service. -func (p *Plugin) RPC() interface{} { - return &rpc{srv: p, log: p.log} -} diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go deleted file mode 100644 index d32d4e3a..00000000 --- a/plugins/informer/rpc.go +++ /dev/null @@ -1,54 +0,0 @@ -package informer - -import ( - "github.com/spiral/roadrunner-plugins/logger" - "github.com/spiral/roadrunner/v2/interfaces/worker" - "github.com/spiral/roadrunner/v2/tools" -) - -type rpc struct { - srv *Plugin - log logger.Logger -} - -// WorkerList contains list of workers. -type WorkerList struct { - // Workers is list of workers. - Workers []tools.ProcessState `json:"workers"` -} - -// List all resettable services. -func (rpc *rpc) List(_ bool, list *[]string) error { - rpc.log.Debug("Started List method") - *list = make([]string, 0, len(rpc.srv.registry)) - - for name := range rpc.srv.registry { - *list = append(*list, name) - } - rpc.log.Debug("list of services", "list", *list) - - rpc.log.Debug("successfully finished List method") - return nil -} - -// Workers state of a given service. -func (rpc *rpc) Workers(service string, list *WorkerList) error { - rpc.log.Debug("started Workers method", "service", service) - workers, err := rpc.srv.Workers(service) - if err != nil { - return err - } - - list.Workers = make([]tools.ProcessState, 0) - for _, w := range workers { - ps, err := tools.WorkerProcessState(w.(worker.BaseProcess)) - if err != nil { - continue - } - - list.Workers = append(list.Workers, ps) - } - rpc.log.Debug("list of workers", "workers", list.Workers) - rpc.log.Debug("successfully finished Workers method") - return nil -} diff --git a/plugins/server/config.go b/plugins/server/config.go deleted file mode 100644 index 4bef3c5f..00000000 --- a/plugins/server/config.go +++ /dev/null @@ -1,39 +0,0 @@ -package server - -import ( - "time" -) - -// Config config combines factory, pool and cmd configurations. -type Config struct { - // Command to run as application. - Command string - - // User to run application under. - User string - - // Group to run application under. - Group string - - // Env represents application environment. - Env Env - - // Listen defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string - - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. Defaults to 60s. - RelayTimeout time.Duration -} - -func (cfg *Config) InitDefaults() { - if cfg.Relay == "" { - cfg.Relay = "pipes" - } - - if cfg.RelayTimeout == 0 { - cfg.RelayTimeout = time.Second * 60 - } -} diff --git a/plugins/server/interface.go b/plugins/server/interface.go deleted file mode 100644 index 9c1079ea..00000000 --- a/plugins/server/interface.go +++ /dev/null @@ -1,20 +0,0 @@ -package server - -import ( - "context" - "os/exec" - - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" -) - -type Env map[string]string - -// Server creates workers for the application. -type Server interface { - CmdFactory(env Env) (func() *exec.Cmd, error) - NewWorker(ctx context.Context, env Env, listeners ...events.EventListener) (worker.BaseProcess, error) - NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.EventListener) (pool.Pool, error) -} diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go deleted file mode 100644 index 3d90c95b..00000000 --- a/plugins/server/plugin.go +++ /dev/null @@ -1,229 +0,0 @@ -package server - -import ( - "context" - "fmt" - "os" - "os/exec" - "strings" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner-plugins/config" - "github.com/spiral/roadrunner-plugins/logger" - - // core imports - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" - "github.com/spiral/roadrunner/v2/pkg/pipe" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/socket" - "github.com/spiral/roadrunner/v2/util" -) - -const PluginName = "server" - -// Plugin manages worker -type Plugin struct { - cfg Config - log logger.Logger - factory worker.Factory -} - -// Init application provider. -func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("Init") - err := cfg.UnmarshalKey(PluginName, &server.cfg) - if err != nil { - return errors.E(op, errors.Init, err) - } - server.cfg.InitDefaults() - server.log = log - - server.factory, err = server.initFactory() - if err != nil { - return errors.E(errors.Op("Init factory"), err) - } - - return nil -} - -// Name contains service name. -func (server *Plugin) Name() string { - return PluginName -} - -func (server *Plugin) Serve() chan error { - errCh := make(chan error, 1) - return errCh -} - -func (server *Plugin) Stop() error { - if server.factory == nil { - return nil - } - - return server.factory.Close() -} - -// CmdFactory provides worker command factory associated with given context. -func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { - const op = errors.Op("cmd factory") - var cmdArgs []string - - // create command according to the config - cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...) - if len(cmdArgs) < 2 { - return nil, errors.E(op, errors.Str("should be in form of `php <script>")) - } - if cmdArgs[0] != "php" { - return nil, errors.E(op, errors.Str("first arg in command should be `php`")) - } - - _, err := os.Stat(cmdArgs[1]) - if err != nil { - return nil, errors.E(op, err) - } - return func() *exec.Cmd { - cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec - util.IsolateProcess(cmd) - - // if user is not empty, and OS is linux or macos - // execute php worker from that particular user - if server.cfg.User != "" { - err := util.ExecuteFromUser(cmd, server.cfg.User) - if err != nil { - return nil - } - } - - cmd.Env = server.setEnv(env) - - return cmd - }, nil -} - -// NewWorker issues new standalone worker. -func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.EventListener) (worker.BaseProcess, error) { - const op = errors.Op("new worker") - - list := make([]events.EventListener, 0, len(listeners)) - list = append(list, server.collectWorkerLogs) - - spawnCmd, err := server.CmdFactory(env) - if err != nil { - return nil, errors.E(op, err) - } - - w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd(), list...) - if err != nil { - return nil, errors.E(op, err) - } - - return w, nil -} - -// NewWorkerPool issues new worker pool. -func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.EventListener) (pool.Pool, error) { - const op = errors.Op("server plugins new worker pool") - spawnCmd, err := server.CmdFactory(env) - if err != nil { - return nil, errors.E(op, err) - } - - list := make([]events.EventListener, 0, len(listeners)) - list = append(list, server.collectPoolLogs) - - p, err := poolImpl.Initialize(ctx, spawnCmd, server.factory, opt, poolImpl.AddListeners(list...)) - if err != nil { - return nil, errors.E(op, err) - } - - return p, nil -} - -// creates relay and worker factory. -func (server *Plugin) initFactory() (worker.Factory, error) { - const op = errors.Op("network factory init") - if server.cfg.Relay == "" || server.cfg.Relay == "pipes" { - return pipe.NewPipeFactory(), nil - } - - dsn := strings.Split(server.cfg.Relay, "://") - if len(dsn) != 2 { - return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) - } - - lsn, err := util.CreateListener(server.cfg.Relay) - if err != nil { - return nil, errors.E(op, errors.Network, err) - } - - switch dsn[0] { - // sockets group - case "unix": - return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil - case "tcp": - return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil - default: - return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) - } -} - -func (server *Plugin) setEnv(e Env) []string { - env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", server.cfg.Relay)) - for k, v := range e { - env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) - } - - return env -} - -func (server *Plugin) collectPoolLogs(event interface{}) { - if we, ok := event.(events.PoolEvent); ok { - switch we.Event { - case events.EventMaxMemory: - server.log.Info("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventNoFreeWorkers: - server.log.Info("no free workers in pool", "error", we.Payload.(error).Error()) - case events.EventPoolError: - server.log.Info("pool error", "error", we.Payload.(error).Error()) - case events.EventSupervisorError: - server.log.Info("pool supervizor error", "error", we.Payload.(error).Error()) - case events.EventTTL: - server.log.Info("worker TTL reached", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventWorkerConstruct: - if _, ok := we.Payload.(error); ok { - server.log.Error("worker construction error", "error", we.Payload.(error).Error()) - return - } - server.log.Info("worker constructed", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventWorkerDestruct: - server.log.Info("worker destructed", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventExecTTL: - server.log.Info("EVENT EXEC TTL PLACEHOLDER") - case events.EventIdleTTL: - server.log.Info("worker IDLE timeout reached", "pid", we.Payload.(worker.BaseProcess).Pid()) - } - } - - if we, ok := event.(events.WorkerEvent); ok { - switch we.Event { - case events.EventWorkerError: - server.log.Info(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid()) - case events.EventWorkerLog: - server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid()) - } - } -} - -func (server *Plugin) collectWorkerLogs(event interface{}) { - if we, ok := event.(events.WorkerEvent); ok { - switch we.Event { - case events.EventWorkerError: - server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid()) - case events.EventWorkerLog: - server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid()) - } - } -} |