summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-15 12:55:12 +0300
committerValery Piashchynski <[email protected]>2020-11-15 12:55:12 +0300
commit5260639a7e95d4d4a36b46fd4b19f665a0a60ef9 (patch)
tree31b2a616b385eb2e207b1f6e0b1269d37f95af75
parenta7ba4df83b4f2c67a3a0fb9d1dd35663935c90be (diff)
Initial commit of http 2.0 plugin
-rw-r--r--plugins/http/attributes/attributes.go76
-rw-r--r--plugins/http/attributes/attributes_test.go79
-rw-r--r--plugins/http/config.go263
-rw-r--r--plugins/http/config_test.go331
-rw-r--r--plugins/http/constants.go6
-rw-r--r--plugins/http/errors.go25
-rw-r--r--plugins/http/errors_windows.go27
-rw-r--r--plugins/http/fcgi_test.go105
-rw-r--r--plugins/http/fixtures/server.crt15
-rw-r--r--plugins/http/fixtures/server.key9
-rw-r--r--plugins/http/h2c_test.go83
-rw-r--r--plugins/http/handler.go208
-rw-r--r--plugins/http/handler_test.go1961
-rw-r--r--plugins/http/parse.go147
-rw-r--r--plugins/http/parse_test.go52
-rw-r--r--plugins/http/request.go183
-rw-r--r--plugins/http/response.go107
-rw-r--r--plugins/http/response_test.go162
-rw-r--r--plugins/http/rpc.go34
-rw-r--r--plugins/http/rpc_test.go221
-rw-r--r--plugins/http/service.go427
-rw-r--r--plugins/http/service_test.go759
-rw-r--r--plugins/http/ssl_test.go254
-rw-r--r--plugins/http/uploads.go160
-rw-r--r--plugins/http/uploads_config.go45
-rw-r--r--plugins/http/uploads_config_test.go24
-rw-r--r--plugins/http/uploads_test.go434
27 files changed, 6197 insertions, 0 deletions
diff --git a/plugins/http/attributes/attributes.go b/plugins/http/attributes/attributes.go
new file mode 100644
index 00000000..77d6ea69
--- /dev/null
+++ b/plugins/http/attributes/attributes.go
@@ -0,0 +1,76 @@
+package attributes
+
+import (
+ "context"
+ "errors"
+ "net/http"
+)
+
+type attrKey int
+
+const contextKey attrKey = iota
+
+type attrs map[string]interface{}
+
+func (v attrs) get(key string) interface{} {
+ if v == nil {
+ return ""
+ }
+
+ return v[key]
+}
+
+func (v attrs) set(key string, value interface{}) {
+ v[key] = value
+}
+
+func (v attrs) del(key string) {
+ delete(v, key)
+}
+
+// Init returns request with new context and attribute bag.
+func Init(r *http.Request) *http.Request {
+ return r.WithContext(context.WithValue(r.Context(), contextKey, attrs{}))
+}
+
+// All returns all context attributes.
+func All(r *http.Request) map[string]interface{} {
+ v := r.Context().Value(contextKey)
+ if v == nil {
+ return attrs{}
+ }
+
+ return v.(attrs)
+}
+
+// Get gets the value from request context. It replaces any existing
+// values.
+func Get(r *http.Request, key string) interface{} {
+ v := r.Context().Value(contextKey)
+ if v == nil {
+ return nil
+ }
+
+ return v.(attrs).get(key)
+}
+
+// Set sets the key to value. It replaces any existing
+// values. Context specific.
+func Set(r *http.Request, key string, value interface{}) error {
+ v := r.Context().Value(contextKey)
+ if v == nil {
+ return errors.New("unable to find `psr:attributes` context key")
+ }
+
+ v.(attrs).set(key, value)
+ return nil
+}
+
+// Delete deletes values associated with attribute key.
+func (v attrs) Delete(key string) {
+ if v == nil {
+ return
+ }
+
+ v.del(key)
+}
diff --git a/plugins/http/attributes/attributes_test.go b/plugins/http/attributes/attributes_test.go
new file mode 100644
index 00000000..2360fd12
--- /dev/null
+++ b/plugins/http/attributes/attributes_test.go
@@ -0,0 +1,79 @@
+package attributes
+
+import (
+ "github.com/stretchr/testify/assert"
+ "net/http"
+ "testing"
+)
+
+func TestAllAttributes(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
+
+ assert.Equal(t, All(r), map[string]interface{}{
+ "key": "value",
+ })
+}
+
+func TestAllAttributesNone(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ assert.Equal(t, All(r), map[string]interface{}{})
+}
+
+func TestAllAttributesNone2(t *testing.T) {
+ r := &http.Request{}
+
+ assert.Equal(t, All(r), map[string]interface{}{})
+}
+
+func TestGetAttribute(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
+ assert.Equal(t, Get(r, "key"), "value")
+}
+
+func TestGetAttributeNone(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ assert.Equal(t, Get(r, "key"), nil)
+}
+
+func TestGetAttributeNone2(t *testing.T) {
+ r := &http.Request{}
+
+ assert.Equal(t, Get(r, "key"), nil)
+}
+
+func TestSetAttribute(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
+ assert.Equal(t, Get(r, "key"), "value")
+}
+
+func TestSetAttributeNone(t *testing.T) {
+ r := &http.Request{}
+
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
+ assert.Equal(t, Get(r, "key"), nil)
+}
diff --git a/plugins/http/config.go b/plugins/http/config.go
new file mode 100644
index 00000000..00f61652
--- /dev/null
+++ b/plugins/http/config.go
@@ -0,0 +1,263 @@
+package http
+
+import (
+ "errors"
+ "fmt"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+ "net"
+ "os"
+ "strings"
+)
+
+// Config configures RoadRunner HTTP server.
+type Config struct {
+ // Port and port to handle as http server.
+ Address string
+
+ // SSL defines https server options.
+ SSL SSLConfig
+
+ // FCGI configuration. You can use FastCGI without HTTP server.
+ FCGI *FCGIConfig
+
+ // HTTP2 configuration
+ HTTP2 *HTTP2Config
+
+ // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited.
+ MaxRequestSize int64
+
+ // TrustedSubnets declare IP subnets which are allowed to set ip using X-Real-Ip and X-Forwarded-For
+ TrustedSubnets []string
+ cidrs []*net.IPNet
+
+ // Uploads configures uploads configuration.
+ Uploads *UploadsConfig
+
+ // Workers configures rr server and worker pool.
+ Workers *roadrunner.ServerConfig
+}
+
+// FCGIConfig for FastCGI server.
+type FCGIConfig struct {
+ // Address and port to handle as http server.
+ Address string
+}
+
+// HTTP2Config HTTP/2 server customizations.
+type HTTP2Config struct {
+ // Enable or disable HTTP/2 extension, default enable.
+ Enabled bool
+
+ // H2C enables HTTP/2 over TCP
+ H2C bool
+
+ // MaxConcurrentStreams defaults to 128.
+ MaxConcurrentStreams uint32
+}
+
+// InitDefaults sets default values for HTTP/2 configuration.
+func (cfg *HTTP2Config) InitDefaults() error {
+ cfg.Enabled = true
+ cfg.MaxConcurrentStreams = 128
+
+ return nil
+}
+
+// SSLConfig defines https server configuration.
+type SSLConfig struct {
+ // Port to listen as HTTPS server, defaults to 443.
+ Port int
+
+ // Redirect when enabled forces all http connections to switch to https.
+ Redirect bool
+
+ // Key defined private server key.
+ Key string
+
+ // Cert is https certificate.
+ Cert string
+
+ // Root CA file
+ RootCA string
+}
+
+// EnableHTTP is true when http server must run.
+func (c *Config) EnableHTTP() bool {
+ return c.Address != ""
+}
+
+// EnableTLS returns true if rr must listen TLS connections.
+func (c *Config) EnableTLS() bool {
+ return c.SSL.Key != "" || c.SSL.Cert != "" || c.SSL.RootCA != ""
+}
+
+// EnableHTTP2 when HTTP/2 extension must be enabled (only with TSL).
+func (c *Config) EnableHTTP2() bool {
+ return c.HTTP2.Enabled
+}
+
+// EnableH2C when HTTP/2 extension must be enabled on TCP.
+func (c *Config) EnableH2C() bool {
+ return c.HTTP2.H2C
+}
+
+// EnableFCGI is true when FastCGI server must be enabled.
+func (c *Config) EnableFCGI() bool {
+ return c.FCGI.Address != ""
+}
+
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if c.Workers == nil {
+ c.Workers = &roadrunner.ServerConfig{}
+ }
+
+ if c.HTTP2 == nil {
+ c.HTTP2 = &HTTP2Config{}
+ }
+
+ if c.FCGI == nil {
+ c.FCGI = &FCGIConfig{}
+ }
+
+ if c.Uploads == nil {
+ c.Uploads = &UploadsConfig{}
+ }
+
+ if c.SSL.Port == 0 {
+ c.SSL.Port = 443
+ }
+
+ err := c.HTTP2.InitDefaults()
+ if err != nil {
+ return err
+ }
+ err = c.Uploads.InitDefaults()
+ if err != nil {
+ return err
+ }
+ err = c.Workers.InitDefaults()
+ if err != nil {
+ return err
+ }
+
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ c.Workers.UpscaleDurations()
+
+ if c.TrustedSubnets == nil {
+ // @see https://en.wikipedia.org/wiki/Reserved_IP_addresses
+ c.TrustedSubnets = []string{
+ "10.0.0.0/8",
+ "127.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "::1/128",
+ "fc00::/7",
+ "fe80::/10",
+ }
+ }
+
+ if err := c.parseCIDRs(); err != nil {
+ return err
+ }
+
+ return c.Valid()
+}
+
+func (c *Config) parseCIDRs() error {
+ for _, cidr := range c.TrustedSubnets {
+ _, cr, err := net.ParseCIDR(cidr)
+ if err != nil {
+ return err
+ }
+
+ c.cidrs = append(c.cidrs, cr)
+ }
+
+ return nil
+}
+
+// IsTrusted if api can be trusted to use X-Real-Ip, X-Forwarded-For
+func (c *Config) IsTrusted(ip string) bool {
+ if c.cidrs == nil {
+ return false
+ }
+
+ i := net.ParseIP(ip)
+ if i == nil {
+ return false
+ }
+
+ for _, cird := range c.cidrs {
+ if cird.Contains(i) {
+ return true
+ }
+ }
+
+ return false
+}
+
+// Valid validates the configuration.
+func (c *Config) Valid() error {
+ if c.Uploads == nil {
+ return errors.New("malformed uploads config")
+ }
+
+ if c.HTTP2 == nil {
+ return errors.New("malformed http2 config")
+ }
+
+ if c.Workers == nil {
+ return errors.New("malformed workers config")
+ }
+
+ if c.Workers.Pool == nil {
+ return errors.New("malformed workers config (pool config is missing)")
+ }
+
+ if err := c.Workers.Pool.Valid(); err != nil {
+ return err
+ }
+
+ if !c.EnableHTTP() && !c.EnableTLS() && !c.EnableFCGI() {
+ return errors.New("unable to run http service, no method has been specified (http, https, http/2 or FastCGI)")
+ }
+
+ if c.Address != "" && !strings.Contains(c.Address, ":") {
+ return errors.New("malformed http server address")
+ }
+
+ if c.EnableTLS() {
+ if _, err := os.Stat(c.SSL.Key); err != nil {
+ if os.IsNotExist(err) {
+ return fmt.Errorf("key file '%s' does not exists", c.SSL.Key)
+ }
+
+ return err
+ }
+
+ if _, err := os.Stat(c.SSL.Cert); err != nil {
+ if os.IsNotExist(err) {
+ return fmt.Errorf("cert file '%s' does not exists", c.SSL.Cert)
+ }
+
+ return err
+ }
+
+ // RootCA is optional, but if provided - check it
+ if c.SSL.RootCA != "" {
+ if _, err := os.Stat(c.SSL.RootCA); err != nil {
+ if os.IsNotExist(err) {
+ return fmt.Errorf("root ca path provided, but path '%s' does not exists", c.SSL.RootCA)
+ }
+ return err
+ }
+ }
+ }
+
+ return nil
+}
diff --git a/plugins/http/config_test.go b/plugins/http/config_test.go
new file mode 100644
index 00000000..d95e0995
--- /dev/null
+++ b/plugins/http/config_test.go
@@ -0,0 +1,331 @@
+package http
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "os"
+ "testing"
+ "time"
+)
+
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error {
+ j := json.ConfigCompatibleWithStandardLibrary
+ return j.Unmarshal([]byte(cfg.cfg), out)
+}
+
+func Test_Config_Hydrate_Error1(t *testing.T) {
+ cfg := &mockCfg{`{"address": "localhost:8080"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error2(t *testing.T) {
+ cfg := &mockCfg{`{"dir": "/dir/"`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Valid(t *testing.T) {
+ cfg := &Config{
+ Address: ":8080",
+ MaxRequestSize: 1024,
+ HTTP2: &HTTP2Config{
+ Enabled: true,
+ },
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ Workers: &roadrunner.ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.NoError(t, cfg.Valid())
+}
+
+func Test_Trusted_Subnets(t *testing.T) {
+ cfg := &Config{
+ Address: ":8080",
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ HTTP2: &HTTP2Config{
+ Enabled: true,
+ },
+ TrustedSubnets: []string{"200.1.0.0/16"},
+ Workers: &roadrunner.ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.NoError(t, cfg.parseCIDRs())
+
+ assert.True(t, cfg.IsTrusted("200.1.0.10"))
+ assert.False(t, cfg.IsTrusted("127.0.0.0.1"))
+}
+
+func Test_Trusted_Subnets_Err(t *testing.T) {
+ cfg := &Config{
+ Address: ":8080",
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ HTTP2: &HTTP2Config{
+ Enabled: true,
+ },
+ TrustedSubnets: []string{"200.1.0.0"},
+ Workers: &roadrunner.ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.Error(t, cfg.parseCIDRs())
+}
+
+func Test_Config_Valid_SSL(t *testing.T) {
+ cfg := &Config{
+ Address: ":8080",
+ SSL: SSLConfig{
+ Cert: "fixtures/server.crt",
+ Key: "fixtures/server.key",
+ },
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ HTTP2: &HTTP2Config{
+ Enabled: true,
+ },
+ Workers: &roadrunner.ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.Error(t, cfg.Hydrate(&testCfg{httpCfg: "{}"}))
+
+ assert.NoError(t, cfg.Valid())
+ assert.True(t, cfg.EnableTLS())
+ assert.Equal(t, 443, cfg.SSL.Port)
+}
+
+func Test_Config_SSL_No_key(t *testing.T) {
+ cfg := &Config{
+ Address: ":8080",
+ SSL: SSLConfig{
+ Cert: "fixtures/server.crt",
+ },
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ HTTP2: &HTTP2Config{
+ Enabled: true,
+ },
+ Workers: &roadrunner.ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.Error(t, cfg.Valid())
+}
+
+func Test_Config_SSL_No_Cert(t *testing.T) {
+ cfg := &Config{
+ Address: ":8080",
+ SSL: SSLConfig{
+ Key: "fixtures/server.key",
+ },
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ HTTP2: &HTTP2Config{
+ Enabled: true,
+ },
+ Workers: &roadrunner.ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.Error(t, cfg.Valid())
+}
+
+func Test_Config_NoUploads(t *testing.T) {
+ cfg := &Config{
+ Address: ":8080",
+ MaxRequestSize: 1024,
+ HTTP2: &HTTP2Config{
+ Enabled: true,
+ },
+ Workers: &roadrunner.ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.Error(t, cfg.Valid())
+}
+
+func Test_Config_NoHTTP2(t *testing.T) {
+ cfg := &Config{
+ Address: ":8080",
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ Workers: &roadrunner.ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 0,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.Error(t, cfg.Valid())
+}
+
+func Test_Config_NoWorkers(t *testing.T) {
+ cfg := &Config{
+ Address: ":8080",
+ MaxRequestSize: 1024,
+ HTTP2: &HTTP2Config{
+ Enabled: true,
+ },
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ }
+
+ assert.Error(t, cfg.Valid())
+}
+
+func Test_Config_NoPool(t *testing.T) {
+ cfg := &Config{
+ Address: ":8080",
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ HTTP2: &HTTP2Config{
+ Enabled: true,
+ },
+ Workers: &roadrunner.ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 0,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.Error(t, cfg.Valid())
+}
+
+func Test_Config_DeadPool(t *testing.T) {
+ cfg := &Config{
+ Address: ":8080",
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ HTTP2: &HTTP2Config{
+ Enabled: true,
+ },
+ Workers: &roadrunner.ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ },
+ }
+
+ assert.Error(t, cfg.Valid())
+}
+
+func Test_Config_InvalidAddress(t *testing.T) {
+ cfg := &Config{
+ Address: "unexpected_address",
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ HTTP2: &HTTP2Config{
+ Enabled: true,
+ },
+ Workers: &roadrunner.ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ },
+ }
+
+ assert.Error(t, cfg.Valid())
+}
diff --git a/plugins/http/constants.go b/plugins/http/constants.go
new file mode 100644
index 00000000..a25f52a4
--- /dev/null
+++ b/plugins/http/constants.go
@@ -0,0 +1,6 @@
+package http
+
+import "net/http"
+
+var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push")
+var trailerHeaderKey = http.CanonicalHeaderKey("trailer")
diff --git a/plugins/http/errors.go b/plugins/http/errors.go
new file mode 100644
index 00000000..fb8762ef
--- /dev/null
+++ b/plugins/http/errors.go
@@ -0,0 +1,25 @@
+// +build !windows
+
+package http
+
+import (
+ "errors"
+ "net"
+ "os"
+ "syscall"
+)
+
+// Broken pipe
+var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer")
+
+// handleWriteError just check if error was caused by aborted connection on linux
+func handleWriteError(err error) error {
+ if netErr, ok2 := err.(*net.OpError); ok2 {
+ if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
+ if syscallErr.Err == syscall.EPIPE {
+ return errEPIPE
+ }
+ }
+ }
+ return err
+}
diff --git a/plugins/http/errors_windows.go b/plugins/http/errors_windows.go
new file mode 100644
index 00000000..3d0ba04c
--- /dev/null
+++ b/plugins/http/errors_windows.go
@@ -0,0 +1,27 @@
+// +build windows
+
+package http
+
+import (
+ "errors"
+ "net"
+ "os"
+ "syscall"
+)
+
+//Software caused connection abort.
+//An established connection was aborted by the software in your host computer,
+//possibly due to a data transmission time-out or protocol error.
+var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer")
+
+// handleWriteError just check if error was caused by aborted connection on windows
+func handleWriteError(err error) error {
+ if netErr, ok2 := err.(*net.OpError); ok2 {
+ if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 {
+ if syscallErr.Err == syscall.WSAECONNABORTED {
+ return errEPIPE
+ }
+ }
+ }
+ return err
+}
diff --git a/plugins/http/fcgi_test.go b/plugins/http/fcgi_test.go
new file mode 100644
index 00000000..e68b2e7f
--- /dev/null
+++ b/plugins/http/fcgi_test.go
@@ -0,0 +1,105 @@
+package http
+
+import (
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "github.com/yookoala/gofast"
+ "io/ioutil"
+ "net/http/httptest"
+ "testing"
+ "time"
+)
+
+func Test_FCGI_Service_Echo(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "fcgi": {
+ "address": "tcp://0.0.0.0:6082"
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "pool": {"numWorkers": 1}
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ // should do nothing
+ s.(*Service).Stop()
+
+ go func() { assert.NoError(t, c.Serve()) }()
+ time.Sleep(time.Second * 1)
+
+ fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6082")
+
+ fcgiHandler := gofast.NewHandler(
+ gofast.BasicParamsMap(gofast.BasicSession),
+ gofast.SimpleClientFactory(fcgiConnFactory, 0),
+ )
+
+ w := httptest.NewRecorder()
+ req := httptest.NewRequest("GET", "http://site.local/?hello=world", nil)
+ fcgiHandler.ServeHTTP(w, req)
+
+ body, err := ioutil.ReadAll(w.Result().Body)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 201, w.Result().StatusCode)
+ assert.Equal(t, "WORLD", string(body))
+ c.Stop()
+}
+
+func Test_FCGI_Service_Request_Uri(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "fcgi": {
+ "address": "tcp://0.0.0.0:6083"
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php request-uri pipes",
+ "pool": {"numWorkers": 1}
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ // should do nothing
+ s.(*Service).Stop()
+
+ go func() { assert.NoError(t, c.Serve()) }()
+ time.Sleep(time.Second * 1)
+
+ fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6083")
+
+ fcgiHandler := gofast.NewHandler(
+ gofast.BasicParamsMap(gofast.BasicSession),
+ gofast.SimpleClientFactory(fcgiConnFactory, 0),
+ )
+
+ w := httptest.NewRecorder()
+ req := httptest.NewRequest("GET", "http://site.local/hello-world", nil)
+ fcgiHandler.ServeHTTP(w, req)
+
+ body, err := ioutil.ReadAll(w.Result().Body)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, w.Result().StatusCode)
+ assert.Equal(t, "http://site.local/hello-world", string(body))
+ c.Stop()
+}
diff --git a/plugins/http/fixtures/server.crt b/plugins/http/fixtures/server.crt
new file mode 100644
index 00000000..24d67fd7
--- /dev/null
+++ b/plugins/http/fixtures/server.crt
@@ -0,0 +1,15 @@
+-----BEGIN CERTIFICATE-----
+MIICTTCCAdOgAwIBAgIJAOKyUd+llTRKMAoGCCqGSM49BAMCMGMxCzAJBgNVBAYT
+AlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2Nv
+MRMwEQYDVQQKDApSb2FkUnVubmVyMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMTgw
+OTMwMTMzNDUzWhcNMjgwOTI3MTMzNDUzWjBjMQswCQYDVQQGEwJVUzETMBEGA1UE
+CAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzETMBEGA1UECgwK
+Um9hZFJ1bm5lcjESMBAGA1UEAwwJbG9jYWxob3N0MHYwEAYHKoZIzj0CAQYFK4EE
+ACIDYgAEVnbShsM+l5RR3wfWWmGhzuFGwNzKCk7i9xyobDIyBUxG/UUSfj7KKlUX
+puDnDEtF5xXcepl744CyIAYFLOXHb5WqI4jCOzG0o9f/00QQ4bQudJOdbqV910QF
+C2vb7Fxro1MwUTAdBgNVHQ4EFgQU9xUexnbB6ORKayA7Pfjzs33otsAwHwYDVR0j
+BBgwFoAU9xUexnbB6ORKayA7Pfjzs33otsAwDwYDVR0TAQH/BAUwAwEB/zAKBggq
+hkjOPQQDAgNoADBlAjEAue3HhR/MUhxoa9tSDBtOJT3FYbDQswrsdqBTz97CGKst
+e7XeZ3HMEvEXy0hGGEMhAjAqcD/4k9vViVppgWFtkk6+NFbm+Kw/QeeAiH5FgFSj
+8xQcb+b7nPwNLp3JOkXkVd4=
+-----END CERTIFICATE-----
diff --git a/plugins/http/fixtures/server.key b/plugins/http/fixtures/server.key
new file mode 100644
index 00000000..7501dd46
--- /dev/null
+++ b/plugins/http/fixtures/server.key
@@ -0,0 +1,9 @@
+-----BEGIN EC PARAMETERS-----
+BgUrgQQAIg==
+-----END EC PARAMETERS-----
+-----BEGIN EC PRIVATE KEY-----
+MIGkAgEBBDCQP8utxNbHR6xZOLAJgUhn88r6IrPqmN0MsgGJM/jePB+T9UhkmIU8
+PMm2HeScbcugBwYFK4EEACKhZANiAARWdtKGwz6XlFHfB9ZaYaHO4UbA3MoKTuL3
+HKhsMjIFTEb9RRJ+PsoqVRem4OcMS0XnFdx6mXvjgLIgBgUs5cdvlaojiMI7MbSj
+1//TRBDhtC50k51upX3XRAULa9vsXGs=
+-----END EC PRIVATE KEY-----
diff --git a/plugins/http/h2c_test.go b/plugins/http/h2c_test.go
new file mode 100644
index 00000000..f17538bc
--- /dev/null
+++ b/plugins/http/h2c_test.go
@@ -0,0 +1,83 @@
+package http
+
+import (
+ "net/http"
+ "testing"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_Service_H2C(t *testing.T) {
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.MaxElapsedTime = time.Second * 15
+
+ err := backoff.Retry(func() error {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{
+ "address": ":6029",
+ "http2": {"h2c":true},
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1
+ }
+ }
+ }`})
+ if err != nil {
+ return err
+ }
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ // should do nothing
+ s.(*Service).Stop()
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error serving: %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ req, err := http.NewRequest("PRI", "http://localhost:6029?hello=world", nil)
+ if err != nil {
+ return err
+ }
+
+ req.Header.Add("Upgrade", "h2c")
+ req.Header.Add("Connection", "HTTP2-Settings")
+ req.Header.Add("HTTP2-Settings", "")
+
+ r, err2 := http.DefaultClient.Do(req)
+ if err2 != nil {
+ return err2
+ }
+
+ assert.Equal(t, "101 Switching Protocols", r.Status)
+
+ err3 := r.Body.Close()
+ if err3 != nil {
+ return err3
+ }
+ return nil
+ }, bkoff)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/plugins/http/handler.go b/plugins/http/handler.go
new file mode 100644
index 00000000..eca05483
--- /dev/null
+++ b/plugins/http/handler.go
@@ -0,0 +1,208 @@
+package http
+
+import (
+ "fmt"
+ "net"
+ "net/http"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner"
+)
+
+const (
+ // EventResponse thrown after the request been processed. See ErrorEvent as payload.
+ EventResponse = iota + 500
+
+ // EventError thrown on any non job error provided by road runner server.
+ EventError
+)
+
+// ErrorEvent represents singular http error event.
+type ErrorEvent struct {
+ // Request contains client request, must not be stored.
+ Request *http.Request
+
+ // Error - associated error, if any.
+ Error error
+
+ // event timings
+ start time.Time
+ elapsed time.Duration
+}
+
+// Elapsed returns duration of the invocation.
+func (e *ErrorEvent) Elapsed() time.Duration {
+ return e.elapsed
+}
+
+// ResponseEvent represents singular http response event.
+type ResponseEvent struct {
+ // Request contains client request, must not be stored.
+ Request *Request
+
+ // Response contains service response.
+ Response *Response
+
+ // event timings
+ start time.Time
+ elapsed time.Duration
+}
+
+// Elapsed returns duration of the invocation.
+func (e *ResponseEvent) Elapsed() time.Duration {
+ return e.elapsed
+}
+
+// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
+// parsed files and query, payload will include parsed form dataTree (if any).
+type Handler struct {
+ cfg *Config
+ log *logrus.Logger
+ rr *roadrunner.Server
+ mul sync.Mutex
+ lsn func(event int, ctx interface{})
+}
+
+// Listen attaches handler event controller.
+func (h *Handler) Listen(l func(event int, ctx interface{})) {
+ h.mul.Lock()
+ defer h.mul.Unlock()
+
+ h.lsn = l
+}
+
+// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
+func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ start := time.Now()
+
+ // validating request size
+ if h.cfg.MaxRequestSize != 0 {
+ if length := r.Header.Get("content-length"); length != "" {
+ if size, err := strconv.ParseInt(length, 10, 64); err != nil {
+ h.handleError(w, r, err, start)
+ return
+ } else if size > h.cfg.MaxRequestSize*1024*1024 {
+ h.handleError(w, r, errors.New("request body max size is exceeded"), start)
+ return
+ }
+ }
+ }
+
+ req, err := NewRequest(r, h.cfg.Uploads)
+ if err != nil {
+ h.handleError(w, r, err, start)
+ return
+ }
+
+ // proxy IP resolution
+ h.resolveIP(req)
+
+ req.Open(h.log)
+ defer req.Close(h.log)
+
+ p, err := req.Payload()
+ if err != nil {
+ h.handleError(w, r, err, start)
+ return
+ }
+
+ rsp, err := h.rr.Exec(p)
+ if err != nil {
+ h.handleError(w, r, err, start)
+ return
+ }
+
+ resp, err := NewResponse(rsp)
+ if err != nil {
+ h.handleError(w, r, err, start)
+ return
+ }
+
+ h.handleResponse(req, resp, start)
+ err = resp.Write(w)
+ if err != nil {
+ h.handleError(w, r, err, start)
+ }
+}
+
+// handleError sends error.
+func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error, start time.Time) {
+ // if pipe is broken, there is no sense to write the header
+ // in this case we just report about error
+ if err == errEPIPE {
+ h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
+ return
+ }
+ // ResponseWriter is ok, write the error code
+ w.WriteHeader(500)
+ _, err2 := w.Write([]byte(err.Error()))
+ // error during the writing to the ResponseWriter
+ if err2 != nil {
+ // concat original error with ResponseWriter error
+ h.throw(EventError, &ErrorEvent{Request: r, Error: errors.New(fmt.Sprintf("error: %v, during handle this error, ResponseWriter error occurred: %v", err, err2)), start: start, elapsed: time.Since(start)})
+ return
+ }
+ h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
+}
+
+// handleResponse triggers response event.
+func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) {
+ h.throw(EventResponse, &ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)})
+}
+
+// throw invokes event handler if any.
+func (h *Handler) throw(event int, ctx interface{}) {
+ h.mul.Lock()
+ defer h.mul.Unlock()
+
+ if h.lsn != nil {
+ h.lsn(event, ctx)
+ }
+}
+
+// get real ip passing multiple proxy
+func (h *Handler) resolveIP(r *Request) {
+ if !h.cfg.IsTrusted(r.RemoteAddr) {
+ return
+ }
+
+ if r.Header.Get("X-Forwarded-For") != "" {
+ ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",")
+ ipCount := len(ips)
+
+ for i := ipCount - 1; i >= 0; i-- {
+ addr := strings.TrimSpace(ips[i])
+ if net.ParseIP(addr) != nil {
+ r.RemoteAddr = addr
+ return
+ }
+ }
+
+ return
+ }
+
+ // The logic here is the following:
+ // In general case, we only expect X-Real-Ip header. If it exist, we get the IP addres from header and set request Remote address
+ // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers
+ // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF.
+ // CF-Connecting-IP is an Enterprise feature and we check it last in order.
+ // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string
+ if r.Header.Get("X-Real-Ip") != "" {
+ r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip"))
+ return
+ }
+
+ if r.Header.Get("True-Client-IP") != "" {
+ r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP"))
+ return
+ }
+
+ if r.Header.Get("CF-Connecting-IP") != "" {
+ r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP"))
+ }
+}
diff --git a/plugins/http/handler_test.go b/plugins/http/handler_test.go
new file mode 100644
index 00000000..cb1cd728
--- /dev/null
+++ b/plugins/http/handler_test.go
@@ -0,0 +1,1961 @@
+package http
+
+import (
+ "bytes"
+ "context"
+ "github.com/spiral/roadrunner"
+ "github.com/stretchr/testify/assert"
+ "io/ioutil"
+ "mime/multipart"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "os"
+ "runtime"
+ "strings"
+ "testing"
+ "time"
+)
+
+// get request and return body
+func get(url string) (string, *http.Response, error) {
+ r, err := http.Get(url)
+ if err != nil {
+ return "", nil, err
+ }
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return "", nil, err
+ }
+
+ err = r.Body.Close()
+ if err != nil {
+ return "", nil, err
+ }
+ return string(b), r, err
+}
+
+// get request and return body
+func getHeader(url string, h map[string]string) (string, *http.Response, error) {
+ req, err := http.NewRequest("GET", url, bytes.NewBuffer(nil))
+ if err != nil {
+ return "", nil, err
+ }
+
+ for k, v := range h {
+ req.Header.Set(k, v)
+ }
+
+ r, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return "", nil, err
+ }
+
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return "", nil, err
+ }
+
+ err = r.Body.Close()
+ if err != nil {
+ return "", nil, err
+ }
+ return string(b), r, err
+}
+
+func TestHandler_Echo(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ body, r, err := get("http://localhost:8177/?hello=world")
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", body)
+}
+
+func Test_HandlerErrors(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ wr := httptest.NewRecorder()
+ rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("data")))
+
+ h.ServeHTTP(wr, rq)
+ assert.Equal(t, 500, wr.Code)
+}
+
+func Test_Handler_JSON_error(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ wr := httptest.NewRecorder()
+ rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("{sd")))
+ rq.Header.Add("Content-Type", "application/json")
+ rq.Header.Add("Content-Size", "3")
+
+ h.ServeHTTP(wr, rq)
+ assert.Equal(t, 500, wr.Code)
+}
+
+func TestHandler_Headers(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php header pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8078", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 100)
+
+ req, err := http.NewRequest("GET", "http://localhost:8078?hello=world", nil)
+ assert.NoError(t, err)
+
+ req.Header.Add("input", "sample")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "world", r.Header.Get("Header"))
+ assert.Equal(t, "SAMPLE", string(b))
+}
+
+func TestHandler_Empty_User_Agent(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php user-agent pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8088", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil)
+ assert.NoError(t, err)
+
+ req.Header.Add("user-agent", "")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "", string(b))
+}
+
+func TestHandler_User_Agent(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php user-agent pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8088", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil)
+ assert.NoError(t, err)
+
+ req.Header.Add("User-Agent", "go-agent")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "go-agent", string(b))
+}
+
+func TestHandler_Cookies(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php cookie pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8079", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ req, err := http.NewRequest("GET", "http://localhost:8079", nil)
+ assert.NoError(t, err)
+
+ req.AddCookie(&http.Cookie{Name: "input", Value: "input-value"})
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "INPUT-VALUE", string(b))
+
+ for _, c := range r.Cookies() {
+ assert.Equal(t, "output", c.Name)
+ assert.Equal(t, "cookie-output", c.Value)
+ }
+}
+
+func TestHandler_JsonPayload_POST(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php payload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8090", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ req, err := http.NewRequest(
+ "POST",
+ "http://localhost"+hs.Addr,
+ bytes.NewBufferString(`{"key":"value"}`),
+ )
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/json")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, `{"value":"key"}`, string(b))
+}
+
+func TestHandler_JsonPayload_PUT(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php payload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8081", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/json")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, `{"value":"key"}`, string(b))
+}
+
+func TestHandler_JsonPayload_PATCH(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php payload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8082", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/json")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, `{"value":"key"}`, string(b))
+}
+
+func TestHandler_FormData_POST(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8083", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 500)
+
+ form := url.Values{}
+
+ form.Add("key", "value")
+ form.Add("name[]", "name1")
+ form.Add("name[]", "name2")
+ form.Add("name[]", "name3")
+ form.Add("arr[x][y][z]", "y")
+ form.Add("arr[x][y][e]", "f")
+ form.Add("arr[c]p", "l")
+ form.Add("arr[c]z", "")
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestHandler_FormData_POST_Overwrite(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8083", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ form := url.Values{}
+
+ form.Add("key", "value1")
+ form.Add("key", "value2")
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"key":"value2","arr":{"x":{"y":null}}}`, string(b))
+}
+
+func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8083", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ form := url.Values{}
+
+ form.Add("key", "value")
+ form.Add("name[]", "name1")
+ form.Add("name[]", "name2")
+ form.Add("name[]", "name3")
+ form.Add("arr[x][y][z]", "y")
+ form.Add("arr[x][y][e]", "f")
+ form.Add("arr[c]p", "l")
+ form.Add("arr[c]z", "")
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestHandler_FormData_PUT(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8084", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 500)
+
+ form := url.Values{}
+
+ form.Add("key", "value")
+ form.Add("name[]", "name1")
+ form.Add("name[]", "name2")
+ form.Add("name[]", "name3")
+ form.Add("arr[x][y][z]", "y")
+ form.Add("arr[x][y][e]", "f")
+ form.Add("arr[c]p", "l")
+ form.Add("arr[c]z", "")
+
+ req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestHandler_FormData_PATCH(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8085", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ form := url.Values{}
+
+ form.Add("key", "value")
+ form.Add("name[]", "name1")
+ form.Add("name[]", "name2")
+ form.Add("name[]", "name3")
+ form.Add("arr[x][y][z]", "y")
+ form.Add("arr[x][y][e]", "f")
+ form.Add("arr[c]p", "l")
+ form.Add("arr[c]z", "")
+
+ req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, strings.NewReader(form.Encode()))
+ assert.NoError(t, err)
+
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestHandler_Multipart_POST(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8019", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+ err := w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name1")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name2")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name3")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[x][y][z]", "y")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[x][y][e]", "f")
+
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[c]p", "l")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[c]z", "")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the writer: error %v", err)
+ }
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestHandler_Multipart_PUT(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8020", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 500)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+ err := w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name1")
+
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name2")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name3")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[x][y][z]", "y")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[x][y][e]", "f")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[c]p", "l")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[c]z", "")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the writer: error %v", err)
+ }
+
+ req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestHandler_Multipart_PATCH(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php data pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8021", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 500)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+ err := w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name1")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name2")
+
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name3")
+
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[x][y][z]", "y")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[x][y][e]", "f")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[c]p", "l")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[c]z", "")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the writer: error %v", err)
+ }
+
+ req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
+}
+
+func TestHandler_Error(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php error pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ _, r, err := get("http://localhost:8177/?hello=world")
+ assert.NoError(t, err)
+ assert.Equal(t, 500, r.StatusCode)
+}
+
+func TestHandler_Error2(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php error2 pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ _, r, err := get("http://localhost:8177/?hello=world")
+ assert.NoError(t, err)
+ assert.Equal(t, 500, r.StatusCode)
+}
+
+func TestHandler_Error3(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php pid pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ b2 := &bytes.Buffer{}
+ for i := 0; i < 1024*1024; i++ {
+ b2.Write([]byte(" "))
+ }
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, b2)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
+
+ assert.NoError(t, err)
+ assert.Equal(t, 500, r.StatusCode)
+}
+
+func TestHandler_ResponseDuration(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ gotresp := make(chan interface{})
+ h.Listen(func(event int, ctx interface{}) {
+ if event == EventResponse {
+ c := ctx.(*ResponseEvent)
+
+ if c.Elapsed() > 0 {
+ close(gotresp)
+ }
+ }
+ })
+
+ body, r, err := get("http://localhost:8177/?hello=world")
+ assert.NoError(t, err)
+
+ <-gotresp
+
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", body)
+}
+
+func TestHandler_ResponseDurationDelayed(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php echoDelay pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ gotresp := make(chan interface{})
+ h.Listen(func(event int, ctx interface{}) {
+ if event == EventResponse {
+ c := ctx.(*ResponseEvent)
+
+ if c.Elapsed() > time.Second {
+ close(gotresp)
+ }
+ }
+ })
+
+ body, r, err := get("http://localhost:8177/?hello=world")
+ assert.NoError(t, err)
+
+ <-gotresp
+
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", body)
+}
+
+func TestHandler_ErrorDuration(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php error pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ goterr := make(chan interface{})
+ h.Listen(func(event int, ctx interface{}) {
+ if event == EventError {
+ c := ctx.(*ErrorEvent)
+
+ if c.Elapsed() > 0 {
+ close(goterr)
+ }
+ }
+ })
+
+ _, r, err := get("http://localhost:8177/?hello=world")
+ assert.NoError(t, err)
+
+ <-goterr
+
+ assert.Equal(t, 500, r.StatusCode)
+}
+
+func TestHandler_IP(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ TrustedSubnets: []string{
+ "10.0.0.0/8",
+ "127.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "::1/128",
+ "fc00::/7",
+ "fe80::/10",
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php ip pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ err := h.cfg.parseCIDRs()
+ if err != nil {
+ t.Errorf("error parsing CIDRs: error %v", err)
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ body, r, err := get("http://127.0.0.1:8177/")
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "127.0.0.1", body)
+}
+
+func TestHandler_XRealIP(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ TrustedSubnets: []string{
+ "10.0.0.0/8",
+ "127.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "::1/128",
+ "fc00::/7",
+ "fe80::/10",
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php ip pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ err := h.cfg.parseCIDRs()
+ if err != nil {
+ t.Errorf("error parsing CIDRs: error %v", err)
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{
+ "X-Real-Ip": "200.0.0.1",
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "200.0.0.1", body)
+}
+
+func TestHandler_XForwardedFor(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ TrustedSubnets: []string{
+ "10.0.0.0/8",
+ "127.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "100.0.0.0/16",
+ "200.0.0.0/16",
+ "::1/128",
+ "fc00::/7",
+ "fe80::/10",
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php ip pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ err := h.cfg.parseCIDRs()
+ if err != nil {
+ t.Errorf("error parsing CIDRs: error %v", err)
+ }
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{
+ "X-Forwarded-For": "100.0.0.1, 200.0.0.1, invalid, 101.0.0.1",
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "101.0.0.1", body)
+
+ body, r, err = getHeader("http://127.0.0.1:8177/", map[string]string{
+ "X-Forwarded-For": "100.0.0.1, 200.0.0.1, 101.0.0.1, invalid",
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "101.0.0.1", body)
+}
+
+func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ TrustedSubnets: []string{
+ "10.0.0.0/8",
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php ip pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ err := h.cfg.parseCIDRs()
+ if err != nil {
+ t.Errorf("error parsing CIDRs: error %v", err)
+ }
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{
+ "X-Forwarded-For": "100.0.0.1, 200.0.0.1, invalid, 101.0.0.1",
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "127.0.0.1", body)
+}
+
+func BenchmarkHandler_Listen_Echo(b *testing.B) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ err := h.rr.Start()
+ if err != nil {
+ b.Errorf("error starting the worker pool: error %v", err)
+ }
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8177", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ b.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ b.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ bb := "WORLD"
+ for n := 0; n < b.N; n++ {
+ r, err := http.Get("http://localhost:8177/?hello=world")
+ if err != nil {
+ b.Fail()
+ }
+ // Response might be nil here
+ if r != nil {
+ br, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ b.Errorf("error reading Body: error %v", err)
+ }
+ if string(br) != bb {
+ b.Fail()
+ }
+ err = r.Body.Close()
+ if err != nil {
+ b.Errorf("error closing the Body: error %v", err)
+ }
+ } else {
+ b.Errorf("got nil response")
+ }
+ }
+}
diff --git a/plugins/http/parse.go b/plugins/http/parse.go
new file mode 100644
index 00000000..9b58d328
--- /dev/null
+++ b/plugins/http/parse.go
@@ -0,0 +1,147 @@
+package http
+
+import (
+ "net/http"
+)
+
+// MaxLevel defines maximum tree depth for incoming request data and files.
+const MaxLevel = 127
+
+type dataTree map[string]interface{}
+type fileTree map[string]interface{}
+
+// parseData parses incoming request body into data tree.
+func parseData(r *http.Request) dataTree {
+ data := make(dataTree)
+ if r.PostForm != nil {
+ for k, v := range r.PostForm {
+ data.push(k, v)
+ }
+ }
+
+ if r.MultipartForm != nil {
+ for k, v := range r.MultipartForm.Value {
+ data.push(k, v)
+ }
+ }
+
+ return data
+}
+
+// pushes value into data tree.
+func (d dataTree) push(k string, v []string) {
+ keys := fetchIndexes(k)
+ if len(keys) <= MaxLevel {
+ d.mount(keys, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d dataTree) mount(i []string, v []string) {
+ if len(i) == 1 {
+ // single value context (last element)
+ d[i[0]] = v[len(v)-1]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(dataTree).mount(i[1:], v)
+ return
+ }
+
+ d[i[0]] = make(dataTree)
+ d[i[0]].(dataTree).mount(i[1:], v)
+}
+
+// parse incoming dataTree request into JSON (including contentMultipart form dataTree)
+func parseUploads(r *http.Request, cfg *UploadsConfig) *Uploads {
+ u := &Uploads{
+ cfg: cfg,
+ tree: make(fileTree),
+ list: make([]*FileUpload, 0),
+ }
+
+ for k, v := range r.MultipartForm.File {
+ files := make([]*FileUpload, 0, len(v))
+ for _, f := range v {
+ files = append(files, NewUpload(f))
+ }
+
+ u.list = append(u.list, files...)
+ u.tree.push(k, files)
+ }
+
+ return u
+}
+
+// pushes new file upload into it's proper place.
+func (d fileTree) push(k string, v []*FileUpload) {
+ keys := fetchIndexes(k)
+ if len(keys) <= MaxLevel {
+ d.mount(keys, v)
+ }
+}
+
+// mount mounts data tree recursively.
+func (d fileTree) mount(i []string, v []*FileUpload) {
+ if len(i) == 1 {
+ // single value context
+ d[i[0]] = v[0]
+ return
+ }
+
+ if len(i) == 2 && i[1] == "" {
+ // non associated array of elements
+ d[i[0]] = v
+ return
+ }
+
+ if p, ok := d[i[0]]; ok {
+ p.(fileTree).mount(i[1:], v)
+ return
+ }
+
+ d[i[0]] = make(fileTree)
+ d[i[0]].(fileTree).mount(i[1:], v)
+}
+
+// fetchIndexes parses input name and splits it into separate indexes list.
+func fetchIndexes(s string) []string {
+ var (
+ pos int
+ ch string
+ keys = make([]string, 1)
+ )
+
+ for _, c := range s {
+ ch = string(c)
+ switch ch {
+ case " ":
+ // ignore all spaces
+ continue
+ case "[":
+ pos = 1
+ continue
+ case "]":
+ if pos == 1 {
+ keys = append(keys, "")
+ }
+ pos = 2
+ default:
+ if pos == 1 || pos == 2 {
+ keys = append(keys, "")
+ }
+
+ keys[len(keys)-1] += ch
+ pos = 0
+ }
+ }
+
+ return keys
+}
diff --git a/plugins/http/parse_test.go b/plugins/http/parse_test.go
new file mode 100644
index 00000000..f95a3f9d
--- /dev/null
+++ b/plugins/http/parse_test.go
@@ -0,0 +1,52 @@
+package http
+
+import "testing"
+
+var samples = []struct {
+ in string
+ out []string
+}{
+ {"key", []string{"key"}},
+ {"key[subkey]", []string{"key", "subkey"}},
+ {"key[subkey]value", []string{"key", "subkey", "value"}},
+ {"key[subkey][value]", []string{"key", "subkey", "value"}},
+ {"key[subkey][value][]", []string{"key", "subkey", "value", ""}},
+ {"key[subkey] [value][]", []string{"key", "subkey", "value", ""}},
+ {"key [ subkey ] [ value ] [ ]", []string{"key", "subkey", "value", ""}},
+}
+
+func Test_FetchIndexes(t *testing.T) {
+ for _, tt := range samples {
+ t.Run(tt.in, func(t *testing.T) {
+ r := fetchIndexes(tt.in)
+ if !same(r, tt.out) {
+ t.Errorf("got %q, want %q", r, tt.out)
+ }
+ })
+ }
+}
+
+func BenchmarkConfig_FetchIndexes(b *testing.B) {
+ for _, tt := range samples {
+ for n := 0; n < b.N; n++ {
+ r := fetchIndexes(tt.in)
+ if !same(r, tt.out) {
+ b.Fail()
+ }
+ }
+ }
+}
+
+func same(in, out []string) bool {
+ if len(in) != len(out) {
+ return false
+ }
+
+ for i, v := range in {
+ if v != out[i] {
+ return false
+ }
+ }
+
+ return true
+}
diff --git a/plugins/http/request.go b/plugins/http/request.go
new file mode 100644
index 00000000..8da5440f
--- /dev/null
+++ b/plugins/http/request.go
@@ -0,0 +1,183 @@
+package http
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "net/url"
+ "strings"
+
+ json "github.com/json-iterator/go"
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service/http/attributes"
+)
+
+const (
+ defaultMaxMemory = 32 << 20 // 32 MB
+ contentNone = iota + 900
+ contentStream
+ contentMultipart
+ contentFormData
+)
+
+// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files.
+type Request struct {
+ // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address.
+ RemoteAddr string `json:"remoteAddr"`
+
+ // Protocol includes HTTP protocol version.
+ Protocol string `json:"protocol"`
+
+ // Method contains name of HTTP method used for the request.
+ Method string `json:"method"`
+
+ // URI contains full request URI with scheme and query.
+ URI string `json:"uri"`
+
+ // Header contains list of request headers.
+ Header http.Header `json:"headers"`
+
+ // Cookies contains list of request cookies.
+ Cookies map[string]string `json:"cookies"`
+
+ // RawQuery contains non parsed query string (to be parsed on php end).
+ RawQuery string `json:"rawQuery"`
+
+ // Parsed indicates that request body has been parsed on RR end.
+ Parsed bool `json:"parsed"`
+
+ // Uploads contains list of uploaded files, their names, sized and associations with temporary files.
+ Uploads *Uploads `json:"uploads"`
+
+ // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions.
+ Attributes map[string]interface{} `json:"attributes"`
+
+ // request body can be parsedData or []byte
+ body interface{}
+}
+
+func fetchIP(pair string) string {
+ if !strings.ContainsRune(pair, ':') {
+ return pair
+ }
+
+ addr, _, _ := net.SplitHostPort(pair)
+ return addr
+}
+
+// NewRequest creates new PSR7 compatible request using net/http request.
+func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) {
+ req = &Request{
+ RemoteAddr: fetchIP(r.RemoteAddr),
+ Protocol: r.Proto,
+ Method: r.Method,
+ URI: uri(r),
+ Header: r.Header,
+ Cookies: make(map[string]string),
+ RawQuery: r.URL.RawQuery,
+ Attributes: attributes.All(r),
+ }
+
+ for _, c := range r.Cookies() {
+ if v, err := url.QueryUnescape(c.Value); err == nil {
+ req.Cookies[c.Name] = v
+ }
+ }
+
+ switch req.contentType() {
+ case contentNone:
+ return req, nil
+
+ case contentStream:
+ req.body, err = ioutil.ReadAll(r.Body)
+ return req, err
+
+ case contentMultipart:
+ if err = r.ParseMultipartForm(defaultMaxMemory); err != nil {
+ return nil, err
+ }
+
+ req.Uploads = parseUploads(r, cfg)
+ fallthrough
+ case contentFormData:
+ if err = r.ParseForm(); err != nil {
+ return nil, err
+ }
+
+ req.body = parseData(r)
+ }
+
+ req.Parsed = true
+ return req, nil
+}
+
+// Open moves all uploaded files to temporary directory so it can be given to php later.
+func (r *Request) Open(log *logrus.Logger) {
+ if r.Uploads == nil {
+ return
+ }
+
+ r.Uploads.Open(log)
+}
+
+// Close clears all temp file uploads
+func (r *Request) Close(log *logrus.Logger) {
+ if r.Uploads == nil {
+ return
+ }
+
+ r.Uploads.Clear(log)
+}
+
+// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
+// files prior to calling this method.
+func (r *Request) Payload() (p *roadrunner.Payload, err error) {
+ p = &roadrunner.Payload{}
+
+ j := json.ConfigCompatibleWithStandardLibrary
+ if p.Context, err = j.Marshal(r); err != nil {
+ return nil, err
+ }
+
+ if r.Parsed {
+ if p.Body, err = j.Marshal(r.body); err != nil {
+ return nil, err
+ }
+ } else if r.body != nil {
+ p.Body = r.body.([]byte)
+ }
+
+ return p, nil
+}
+
+// contentType returns the payload content type.
+func (r *Request) contentType() int {
+ if r.Method == "HEAD" || r.Method == "OPTIONS" {
+ return contentNone
+ }
+
+ ct := r.Header.Get("content-type")
+ if strings.Contains(ct, "application/x-www-form-urlencoded") {
+ return contentFormData
+ }
+
+ if strings.Contains(ct, "multipart/form-data") {
+ return contentMultipart
+ }
+
+ return contentStream
+}
+
+// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
+func uri(r *http.Request) string {
+ if r.URL.Host != "" {
+ return r.URL.String()
+ }
+ if r.TLS != nil {
+ return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
+ }
+
+ return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
+}
diff --git a/plugins/http/response.go b/plugins/http/response.go
new file mode 100644
index 00000000..f34754be
--- /dev/null
+++ b/plugins/http/response.go
@@ -0,0 +1,107 @@
+package http
+
+import (
+ "io"
+ "net/http"
+ "strings"
+
+ json "github.com/json-iterator/go"
+
+ "github.com/spiral/roadrunner"
+)
+
+
+// Response handles PSR7 response logic.
+type Response struct {
+ // Status contains response status.
+ Status int `json:"status"`
+
+ // Header contains list of response headers.
+ Headers map[string][]string `json:"headers"`
+
+ // associated body payload.
+ body interface{}
+}
+
+// NewResponse creates new response based on given rr payload.
+func NewResponse(p *roadrunner.Payload) (*Response, error) {
+ r := &Response{body: p.Body}
+ j := json.ConfigCompatibleWithStandardLibrary
+ if err := j.Unmarshal(p.Context, r); err != nil {
+ return nil, err
+ }
+
+ return r, nil
+}
+
+// Write writes response headers, status and body into ResponseWriter.
+func (r *Response) Write(w http.ResponseWriter) error {
+ // INFO map is the reference type in golang
+ p := handlePushHeaders(r.Headers)
+ if pusher, ok := w.(http.Pusher); ok {
+ for _, v := range p {
+ err := pusher.Push(v, nil)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ handleTrailers(r.Headers)
+ for n, h := range r.Headers {
+ for _, v := range h {
+ w.Header().Add(n, v)
+ }
+ }
+
+ w.WriteHeader(r.Status)
+
+ if data, ok := r.body.([]byte); ok {
+ _, err := w.Write(data)
+ if err != nil {
+ return handleWriteError(err)
+ }
+ }
+
+ if rc, ok := r.body.(io.Reader); ok {
+ if _, err := io.Copy(w, rc); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func handlePushHeaders(h map[string][]string) []string {
+ var p []string
+ pushHeader, ok := h[http2pushHeaderKey]
+ if !ok {
+ return p
+ }
+
+ p = append(p, pushHeader...)
+
+ delete(h, http2pushHeaderKey)
+
+ return p
+}
+
+func handleTrailers(h map[string][]string) {
+ trailers, ok := h[trailerHeaderKey]
+ if !ok {
+ return
+ }
+
+ for _, tr := range trailers {
+ for _, n := range strings.Split(tr, ",") {
+ n = strings.Trim(n, "\t ")
+ if v, ok := h[n]; ok {
+ h["Trailer:"+n] = v
+
+ delete(h, n)
+ }
+ }
+ }
+
+ delete(h, trailerHeaderKey)
+}
diff --git a/plugins/http/response_test.go b/plugins/http/response_test.go
new file mode 100644
index 00000000..1f394276
--- /dev/null
+++ b/plugins/http/response_test.go
@@ -0,0 +1,162 @@
+package http
+
+import (
+ "bytes"
+ "errors"
+ "net/http"
+ "testing"
+
+ "github.com/spiral/roadrunner"
+ "github.com/stretchr/testify/assert"
+)
+
+type testWriter struct {
+ h http.Header
+ buf bytes.Buffer
+ wroteHeader bool
+ code int
+ err error
+ pushErr error
+ pushes []string
+}
+
+func (tw *testWriter) Header() http.Header { return tw.h }
+
+func (tw *testWriter) Write(p []byte) (int, error) {
+ if !tw.wroteHeader {
+ tw.WriteHeader(http.StatusOK)
+ }
+
+ n, e := tw.buf.Write(p)
+ if e == nil {
+ e = tw.err
+ }
+
+ return n, e
+}
+
+func (tw *testWriter) WriteHeader(code int) { tw.wroteHeader = true; tw.code = code }
+
+func (tw *testWriter) Push(target string, opts *http.PushOptions) error {
+ tw.pushes = append(tw.pushes, target)
+
+ return tw.pushErr
+}
+
+func TestNewResponse_Error(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{Context: []byte(`invalid payload`)})
+ assert.Error(t, err)
+ assert.Nil(t, r)
+}
+
+func TestNewResponse_Write(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{
+ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
+ Body: []byte(`sample body`),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Equal(t, 301, w.code)
+ assert.Equal(t, "value", w.h.Get("key"))
+ assert.Equal(t, "sample body", w.buf.String())
+}
+
+func TestNewResponse_Stream(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{
+ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
+ })
+
+ // r is pointer, so, it might be nil
+ if r == nil {
+ t.Fatal("response is nil")
+ }
+
+ r.body = &bytes.Buffer{}
+ r.body.(*bytes.Buffer).WriteString("hello world")
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Equal(t, 301, w.code)
+ assert.Equal(t, "value", w.h.Get("key"))
+ assert.Equal(t, "hello world", w.buf.String())
+}
+
+func TestNewResponse_StreamError(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{
+ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
+ })
+
+ // r is pointer, so, it might be nil
+ if r == nil {
+ t.Fatal("response is nil")
+ }
+
+ r.body = &bytes.Buffer{}
+ r.body.(*bytes.Buffer).WriteString("hello world")
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string)), err: errors.New("error")}
+ assert.Error(t, r.Write(w))
+}
+
+func TestWrite_HandlesPush(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{
+ Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Nil(t, w.h["Http2-Push"])
+ assert.Equal(t, []string{"/test.js"}, w.pushes)
+}
+
+func TestWrite_HandlesTrailers(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{
+ Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Nil(t, w.h[trailerHeaderKey])
+ assert.Nil(t, w.h["foo"]) //nolint:golint,staticcheck
+ assert.Nil(t, w.h["baz"]) //nolint:golint,staticcheck
+
+ assert.Equal(t, "test", w.h.Get("Trailer:foo"))
+ assert.Equal(t, "demo", w.h.Get("Trailer:bar"))
+}
+
+func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) {
+ r, err := NewResponse(&roadrunner.Payload{
+ Context: []byte(
+ `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, r)
+
+ w := &testWriter{h: http.Header(make(map[string][]string))}
+ assert.NoError(t, r.Write(w))
+
+ assert.Equal(t, "a", w.h.Get("Trailer:foo"))
+ assert.Equal(t, "b", w.h.Get("Trailer:bar"))
+ assert.Equal(t, "c", w.h.Get("Trailer:baz"))
+}
diff --git a/plugins/http/rpc.go b/plugins/http/rpc.go
new file mode 100644
index 00000000..7b38dece
--- /dev/null
+++ b/plugins/http/rpc.go
@@ -0,0 +1,34 @@
+package http
+
+import (
+ "github.com/pkg/errors"
+ "github.com/spiral/roadrunner/util"
+)
+
+type rpcServer struct{ svc *Service }
+
+// WorkerList contains list of workers.
+type WorkerList struct {
+ // Workers is list of workers.
+ Workers []*util.State `json:"workers"`
+}
+
+// Reset resets underlying RR worker pool and restarts all of it's workers.
+func (rpc *rpcServer) Reset(reset bool, r *string) error {
+ if rpc.svc == nil || rpc.svc.handler == nil {
+ return errors.New("http server is not running")
+ }
+
+ *r = "OK"
+ return rpc.svc.Server().Reset()
+}
+
+// Workers returns list of active workers and their stats.
+func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) {
+ if rpc.svc == nil || rpc.svc.handler == nil {
+ return errors.New("http server is not running")
+ }
+
+ r.Workers, err = util.ServerState(rpc.svc.Server())
+ return err
+}
diff --git a/plugins/http/rpc_test.go b/plugins/http/rpc_test.go
new file mode 100644
index 00000000..e57a8699
--- /dev/null
+++ b/plugins/http/rpc_test.go
@@ -0,0 +1,221 @@
+package http
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/service/rpc"
+ "github.com/stretchr/testify/assert"
+ "os"
+ "strconv"
+ "testing"
+ "time"
+)
+
+func Test_RPC(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":16031",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php pid pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, _ := c.Get(ID)
+ ss := s.(*Service)
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
+
+ time.Sleep(time.Second)
+
+ res, _, err := get("http://localhost:16031")
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ r := ""
+ assert.NoError(t, cl.Call("http.Reset", true, &r))
+ assert.Equal(t, "OK", r)
+
+ res2, _, err := get("http://localhost:16031")
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2)
+ assert.NotEqual(t, res, res2)
+ c.Stop()
+}
+
+func Test_RPC_Unix(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(ID, &Service{})
+
+ sock := `unix://` + os.TempDir() + `/rpc.unix`
+ j := json.ConfigCompatibleWithStandardLibrary
+ data, _ := j.Marshal(sock)
+
+ assert.NoError(t, c.Init(&testCfg{
+ rpcCfg: `{"enable":true, "listen":` + string(data) + `}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6032",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php pid pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, _ := c.Get(ID)
+ ss := s.(*Service)
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
+
+ time.Sleep(time.Millisecond * 500)
+
+ res, _, err := get("http://localhost:6032")
+ if err != nil {
+ c.Stop()
+ t.Fatal(err)
+ }
+ if ss.rr.Workers() != nil && len(ss.rr.Workers()) > 0 {
+ assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res)
+ } else {
+ c.Stop()
+ t.Fatal("no workers initialized")
+ }
+
+ cl, err := rs.Client()
+ if err != nil {
+ c.Stop()
+ t.Fatal(err)
+ }
+
+ r := ""
+ assert.NoError(t, cl.Call("http.Reset", true, &r))
+ assert.Equal(t, "OK", r)
+
+ res2, _, err := get("http://localhost:6032")
+ if err != nil {
+ c.Stop()
+ t.Fatal(err)
+ }
+ assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2)
+ assert.NotEqual(t, res, res2)
+ c.Stop()
+}
+
+func Test_Workers(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6033",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php pid pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, _ := c.Get(ID)
+ ss := s.(*Service)
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 500)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ r := &WorkerList{}
+ assert.NoError(t, cl.Call("http.Workers", true, &r))
+ assert.Len(t, r.Workers, 1)
+
+ assert.Equal(t, *ss.rr.Workers()[0].Pid, r.Workers[0].Pid)
+ c.Stop()
+}
+
+func Test_Errors(t *testing.T) {
+ r := &rpcServer{nil}
+
+ assert.Error(t, r.Reset(true, nil))
+ assert.Error(t, r.Workers(true, nil))
+}
diff --git a/plugins/http/service.go b/plugins/http/service.go
new file mode 100644
index 00000000..25a10064
--- /dev/null
+++ b/plugins/http/service.go
@@ -0,0 +1,427 @@
+package http
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/http/fcgi"
+ "net/url"
+ "strings"
+ "sync"
+
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service/env"
+ "github.com/spiral/roadrunner/service/http/attributes"
+ "github.com/spiral/roadrunner/service/rpc"
+ "github.com/spiral/roadrunner/util"
+ "golang.org/x/net/http2"
+ "golang.org/x/net/http2/h2c"
+ "golang.org/x/sys/cpu"
+)
+
+const (
+ // ID contains default service name.
+ ID = "http"
+
+ // EventInitSSL thrown at moment of https initialization. SSL server passed as context.
+ EventInitSSL = 750
+)
+
+var couldNotAppendPemError = errors.New("could not append Certs from PEM")
+
+// http middleware type.
+type middleware func(f http.HandlerFunc) http.HandlerFunc
+
+// Service manages rr, http servers.
+type Service struct {
+ sync.Mutex
+ sync.WaitGroup
+
+ cfg *Config
+ log *logrus.Logger
+ cprod roadrunner.CommandProducer
+ env env.Environment
+ lsns []func(event int, ctx interface{})
+ mdwr []middleware
+
+ rr *roadrunner.Server
+ controller roadrunner.Controller
+ handler *Handler
+
+ http *http.Server
+ https *http.Server
+ fcgi *http.Server
+}
+
+// Attach attaches controller. Currently only one controller is supported.
+func (s *Service) Attach(w roadrunner.Controller) {
+ s.controller = w
+}
+
+// ProduceCommands changes the default command generator method
+func (s *Service) ProduceCommands(producer roadrunner.CommandProducer) {
+ s.cprod = producer
+}
+
+// AddMiddleware adds new net/http mdwr.
+func (s *Service) AddMiddleware(m middleware) {
+ s.mdwr = append(s.mdwr, m)
+}
+
+// AddListener attaches server event controller.
+func (s *Service) AddListener(l func(event int, ctx interface{})) {
+ s.lsns = append(s.lsns, l)
+}
+
+// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
+// misconfiguration. Services must not be used without proper configuration pushed first.
+func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment, log *logrus.Logger) (bool, error) {
+ s.cfg = cfg
+ s.log = log
+ s.env = e
+
+ if r != nil {
+ if err := r.Register(ID, &rpcServer{s}); err != nil {
+ return false, err
+ }
+ }
+
+ if !cfg.EnableHTTP() && !cfg.EnableTLS() && !cfg.EnableFCGI() {
+ return false, nil
+ }
+
+ return true, nil
+}
+
+// Serve serves the svc.
+func (s *Service) Serve() error {
+ s.Lock()
+
+ if s.env != nil {
+ if err := s.env.Copy(s.cfg.Workers); err != nil {
+ return nil
+ }
+ }
+
+ s.cfg.Workers.CommandProducer = s.cprod
+ s.cfg.Workers.SetEnv("RR_HTTP", "true")
+
+ s.rr = roadrunner.NewServer(s.cfg.Workers)
+ s.rr.Listen(s.throw)
+
+ if s.controller != nil {
+ s.rr.Attach(s.controller)
+ }
+
+ s.handler = &Handler{cfg: s.cfg, rr: s.rr}
+ s.handler.Listen(s.throw)
+
+ if s.cfg.EnableHTTP() {
+ if s.cfg.EnableH2C() {
+ s.http = &http.Server{Addr: s.cfg.Address, Handler: h2c.NewHandler(s, &http2.Server{})}
+ } else {
+ s.http = &http.Server{Addr: s.cfg.Address, Handler: s}
+ }
+ }
+
+ if s.cfg.EnableTLS() {
+ s.https = s.initSSL()
+ if s.cfg.SSL.RootCA != "" {
+ err := s.appendRootCa()
+ if err != nil {
+ return err
+ }
+ }
+
+ if s.cfg.EnableHTTP2() {
+ if err := s.initHTTP2(); err != nil {
+ return err
+ }
+ }
+ }
+
+ if s.cfg.EnableFCGI() {
+ s.fcgi = &http.Server{Handler: s}
+ }
+
+ s.Unlock()
+
+ if err := s.rr.Start(); err != nil {
+ return err
+ }
+ defer s.rr.Stop()
+
+ err := make(chan error, 3)
+
+ if s.http != nil {
+ go func() {
+ httpErr := s.http.ListenAndServe()
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ err <- httpErr
+ } else {
+ err <- nil
+ }
+ }()
+ }
+
+ if s.https != nil {
+ go func() {
+ httpErr := s.https.ListenAndServeTLS(
+ s.cfg.SSL.Cert,
+ s.cfg.SSL.Key,
+ )
+
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ err <- httpErr
+ return
+ }
+ err <- nil
+ }()
+ }
+
+ if s.fcgi != nil {
+ go func() {
+ httpErr := s.serveFCGI()
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ err <- httpErr
+ return
+ }
+ err <- nil
+ }()
+ }
+ return <-err
+}
+
+// Stop stops the http.
+func (s *Service) Stop() {
+ s.Lock()
+ defer s.Unlock()
+
+ if s.fcgi != nil {
+ s.Add(1)
+ go func() {
+ defer s.Done()
+ err := s.fcgi.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ // Stop() error
+ // push error from goroutines to the channel and block unil error or success shutdown or timeout
+ s.log.Error(fmt.Errorf("error shutting down the fcgi server, error: %v", err))
+ return
+ }
+ }()
+ }
+
+ if s.https != nil {
+ s.Add(1)
+ go func() {
+ defer s.Done()
+ err := s.https.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error(fmt.Errorf("error shutting down the https server, error: %v", err))
+ return
+ }
+ }()
+ }
+
+ if s.http != nil {
+ s.Add(1)
+ go func() {
+ defer s.Done()
+ err := s.http.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error(fmt.Errorf("error shutting down the http server, error: %v", err))
+ return
+ }
+ }()
+ }
+
+ s.Wait()
+}
+
+// Server returns associated rr server (if any).
+func (s *Service) Server() *roadrunner.Server {
+ s.Lock()
+ defer s.Unlock()
+
+ return s.rr
+}
+
+// ServeHTTP handles connection using set of middleware and rr PSR-7 server.
+func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect {
+ target := &url.URL{
+ Scheme: "https",
+ Host: s.tlsAddr(r.Host, false),
+ Path: r.URL.Path,
+ RawQuery: r.URL.RawQuery,
+ }
+
+ http.Redirect(w, r, target.String(), http.StatusTemporaryRedirect)
+ return
+ }
+
+ if s.https != nil && r.TLS != nil {
+ w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload")
+ }
+
+ r = attributes.Init(r)
+
+ // chaining middleware
+ f := s.handler.ServeHTTP
+ for _, m := range s.mdwr {
+ f = m(f)
+ }
+ f(w, r)
+}
+
+// append RootCA to the https server TLS config
+func (s *Service) appendRootCa() error {
+ rootCAs, err := x509.SystemCertPool()
+ if err != nil {
+ s.throw(EventInitSSL, nil)
+ return nil
+ }
+ if rootCAs == nil {
+ rootCAs = x509.NewCertPool()
+ }
+
+ CA, err := ioutil.ReadFile(s.cfg.SSL.RootCA)
+ if err != nil {
+ s.throw(EventInitSSL, nil)
+ return err
+ }
+
+ // should append our CA cert
+ ok := rootCAs.AppendCertsFromPEM(CA)
+ if !ok {
+ return couldNotAppendPemError
+ }
+ config := &tls.Config{
+ InsecureSkipVerify: false,
+ RootCAs: rootCAs,
+ }
+ s.http.TLSConfig = config
+
+ return nil
+}
+
+// Init https server
+func (s *Service) initSSL() *http.Server {
+ var topCipherSuites []uint16
+ var defaultCipherSuitesTLS13 []uint16
+
+ hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ
+ hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL
+ // Keep in sync with crypto/aes/cipher_s390x.go.
+ hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM)
+
+ hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X
+
+ if hasGCMAsm {
+ // If AES-GCM hardware is provided then prioritise AES-GCM
+ // cipher suites.
+ topCipherSuites = []uint16{
+ tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
+ }
+ defaultCipherSuitesTLS13 = []uint16{
+ tls.TLS_AES_128_GCM_SHA256,
+ tls.TLS_CHACHA20_POLY1305_SHA256,
+ tls.TLS_AES_256_GCM_SHA384,
+ }
+ } else {
+ // Without AES-GCM hardware, we put the ChaCha20-Poly1305
+ // cipher suites first.
+ topCipherSuites = []uint16{
+ tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+ }
+ defaultCipherSuitesTLS13 = []uint16{
+ tls.TLS_CHACHA20_POLY1305_SHA256,
+ tls.TLS_AES_128_GCM_SHA256,
+ tls.TLS_AES_256_GCM_SHA384,
+ }
+ }
+
+ DefaultCipherSuites := make([]uint16, 0, 22)
+ DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...)
+ DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...)
+
+ server := &http.Server{
+ Addr: s.tlsAddr(s.cfg.Address, true),
+ Handler: s,
+ TLSConfig: &tls.Config{
+ CurvePreferences: []tls.CurveID{
+ tls.CurveP256,
+ tls.CurveP384,
+ tls.CurveP521,
+ tls.X25519,
+ },
+ CipherSuites: DefaultCipherSuites,
+ MinVersion: tls.VersionTLS12,
+ PreferServerCipherSuites: true,
+ },
+ }
+ s.throw(EventInitSSL, server)
+
+ return server
+}
+
+// init http/2 server
+func (s *Service) initHTTP2() error {
+ return http2.ConfigureServer(s.https, &http2.Server{
+ MaxConcurrentStreams: s.cfg.HTTP2.MaxConcurrentStreams,
+ })
+}
+
+// serveFCGI starts FastCGI server.
+func (s *Service) serveFCGI() error {
+ l, err := util.CreateListener(s.cfg.FCGI.Address)
+ if err != nil {
+ return err
+ }
+
+ err = fcgi.Serve(l, s.fcgi.Handler)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// throw handles service, server and pool events.
+func (s *Service) throw(event int, ctx interface{}) {
+ for _, l := range s.lsns {
+ l(event, ctx)
+ }
+
+ if event == roadrunner.EventServerFailure {
+ // underlying rr server is dead
+ s.Stop()
+ }
+}
+
+// tlsAddr replaces listen or host port with port configured by SSL config.
+func (s *Service) tlsAddr(host string, forcePort bool) string {
+ // remove current forcePort first
+ host = strings.Split(host, ":")[0]
+
+ if forcePort || s.cfg.SSL.Port != 443 {
+ host = fmt.Sprintf("%s:%v", host, s.cfg.SSL.Port)
+ }
+
+ return host
+}
diff --git a/plugins/http/service_test.go b/plugins/http/service_test.go
new file mode 100644
index 00000000..f7ee33cc
--- /dev/null
+++ b/plugins/http/service_test.go
@@ -0,0 +1,759 @@
+package http
+
+import (
+ "github.com/cenkalti/backoff/v4"
+ json "github.com/json-iterator/go"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/service/env"
+ "github.com/spiral/roadrunner/service/rpc"
+ "github.com/stretchr/testify/assert"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "testing"
+ "time"
+)
+
+type testCfg struct {
+ httpCfg string
+ rpcCfg string
+ envCfg string
+ target string
+}
+
+func (cfg *testCfg) Get(name string) service.Config {
+ if name == ID {
+ if cfg.httpCfg == "" {
+ return nil
+ }
+
+ return &testCfg{target: cfg.httpCfg}
+ }
+
+ if name == rpc.ID {
+ return &testCfg{target: cfg.rpcCfg}
+ }
+
+ if name == env.ID {
+ return &testCfg{target: cfg.envCfg}
+ }
+
+ return nil
+}
+func (cfg *testCfg) Unmarshal(out interface{}) error {
+ j := json.ConfigCompatibleWithStandardLibrary
+ return j.Unmarshal([]byte(cfg.target), out)
+}
+
+func Test_Service_NoConfig(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{"Enable":true}`})
+ assert.Error(t, err)
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusInactive, st)
+}
+
+func Test_Service_Configure_Disable(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusInactive, st)
+}
+
+func Test_Service_Configure_Enable(t *testing.T) {
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.MaxElapsedTime = time.Second * 15
+
+ err := backoff.Retry(func() error {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":8070",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`})
+ if err != nil {
+ return err
+ }
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ return nil
+ }, bkoff)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+}
+
+func Test_Service_Echo(t *testing.T) {
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.MaxElapsedTime = time.Second * 15
+
+ err := backoff.Retry(func() error {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6536",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`})
+ if err != nil {
+ return err
+ }
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ // should do nothing
+ s.(*Service).Stop()
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
+
+ time.Sleep(time.Millisecond * 100)
+
+ req, err := http.NewRequest("GET", "http://localhost:6536?hello=world", nil)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ r, err := http.DefaultClient.Do(req)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+
+ err = r.Body.Close()
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ c.Stop()
+ return nil
+ }, bkoff)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func Test_Service_Env(t *testing.T) {
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.MaxElapsedTime = time.Second * 15
+
+ err := backoff.Retry(func() error {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(env.ID, env.NewService(map[string]string{"rr": "test"}))
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":10031",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php env pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`, envCfg: `{"env_key":"ENV_VALUE"}`})
+ if err != nil {
+ return err
+ }
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ // should do nothing
+ s.(*Service).Stop()
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
+
+ time.Sleep(time.Millisecond * 500)
+
+ req, err := http.NewRequest("GET", "http://localhost:10031", nil)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ r, err := http.DefaultClient.Do(req)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ assert.Equal(t, 200, r.StatusCode)
+ assert.Equal(t, "ENV_VALUE", string(b))
+
+ err = r.Body.Close()
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ c.Stop()
+ return nil
+ }, bkoff)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+}
+
+func Test_Service_ErrorEcho(t *testing.T) {
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.MaxElapsedTime = time.Second * 15
+
+ err := backoff.Retry(func() error {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6030",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php echoerr pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`})
+ if err != nil {
+ return err
+ }
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ goterr := make(chan interface{})
+ s.(*Service).AddListener(func(event int, ctx interface{}) {
+ if event == roadrunner.EventStderrOutput {
+ if string(ctx.([]byte)) == "WORLD\n" {
+ goterr <- nil
+ }
+ }
+ })
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
+
+ time.Sleep(time.Millisecond * 500)
+
+ req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ r, err := http.DefaultClient.Do(req)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ <-goterr
+
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+ err = r.Body.Close()
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ c.Stop()
+
+ return nil
+ }, bkoff)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func Test_Service_Middleware(t *testing.T) {
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.MaxElapsedTime = time.Second * 15
+
+ err := backoff.Retry(func() error {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6032",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`})
+ if err != nil {
+ return err
+ }
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path == "/halt" {
+ w.WriteHeader(500)
+ _, err := w.Write([]byte("halted"))
+ if err != nil {
+ t.Errorf("error writing the data to the http reply: error %v", err)
+ }
+ } else {
+ f(w, r)
+ }
+ }
+ })
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 500)
+
+ req, err := http.NewRequest("GET", "http://localhost:6032?hello=world", nil)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ r, err := http.DefaultClient.Do(req)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+
+ err = r.Body.Close()
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ req, err = http.NewRequest("GET", "http://localhost:6032/halt", nil)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ r, err = http.DefaultClient.Do(req)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+ b, err = ioutil.ReadAll(r.Body)
+ if err != nil {
+ c.Stop()
+ return err
+ }
+
+ assert.Equal(t, 500, r.StatusCode)
+ assert.Equal(t, "halted", string(b))
+
+ err = r.Body.Close()
+ if err != nil {
+ c.Stop()
+ return err
+ }
+ c.Stop()
+
+ return nil
+ }, bkoff)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+}
+
+func Test_Service_Listener(t *testing.T) {
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.MaxElapsedTime = time.Second * 15
+
+ err := backoff.Retry(func() error {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6033",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`})
+ if err != nil {
+ return err
+ }
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ stop := make(chan interface{})
+ s.(*Service).AddListener(func(event int, ctx interface{}) {
+ if event == roadrunner.EventServerStart {
+ stop <- nil
+ }
+ })
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 500)
+
+ c.Stop()
+ assert.True(t, true)
+
+ return nil
+ }, bkoff)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func Test_Service_Error(t *testing.T) {
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.MaxElapsedTime = time.Second * 15
+
+ err := backoff.Retry(func() error {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6034",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "relay": "---",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`})
+ if err != nil {
+ return err
+ }
+
+ // assert error
+ err = c.Serve()
+ if err == nil {
+ return err
+ }
+
+ return nil
+ }, bkoff)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func Test_Service_Error2(t *testing.T) {
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.MaxElapsedTime = time.Second * 15
+
+ err := backoff.Retry(func() error {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6035",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php broken pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`})
+ if err != nil {
+ return err
+ }
+
+ // assert error
+ err = c.Serve()
+ if err == nil {
+ return err
+ }
+
+ return nil
+ }, bkoff)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func Test_Service_Error3(t *testing.T) {
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.MaxElapsedTime = time.Second * 15
+
+ err := backoff.Retry(func() error {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6036",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers"
+ "command": "php ../../tests/http/client.php broken pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`})
+ // assert error
+ if err == nil {
+ return err
+ }
+
+ return nil
+ }, bkoff)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+}
+
+func Test_Service_Error4(t *testing.T) {
+ bkoff := backoff.NewExponentialBackOff()
+ bkoff.MaxElapsedTime = time.Second * 15
+
+ err := backoff.Retry(func() error {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ err := c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": "----",
+ "maxRequestSize": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php broken pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`})
+ // assert error
+ if err != nil {
+ return nil
+ }
+
+ return err
+ }, bkoff)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func tmpDir() string {
+ p := os.TempDir()
+ j := json.ConfigCompatibleWithStandardLibrary
+ r, _ := j.Marshal(p)
+
+ return string(r)
+}
diff --git a/plugins/http/ssl_test.go b/plugins/http/ssl_test.go
new file mode 100644
index 00000000..cf147be9
--- /dev/null
+++ b/plugins/http/ssl_test.go
@@ -0,0 +1,254 @@
+package http
+
+import (
+ "crypto/tls"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "io/ioutil"
+ "net/http"
+ "testing"
+ "time"
+)
+
+var sslClient = &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true,
+ },
+ },
+}
+
+func Test_SSL_Service_Echo(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "address": ":6029",
+ "ssl": {
+ "port": 6900,
+ "key": "fixtures/server.key",
+ "cert": "fixtures/server.crt"
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "pool": {"numWorkers": 1}
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ // should do nothing
+ s.(*Service).Stop()
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 500)
+
+ req, err := http.NewRequest("GET", "https://localhost:6900?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := sslClient.Do(req)
+ assert.NoError(t, err)
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+
+ err2 := r.Body.Close()
+ if err2 != nil {
+ t.Errorf("fail to close the Body: error %v", err2)
+ }
+
+ c.Stop()
+}
+
+func Test_SSL_Service_NoRedirect(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "address": ":6030",
+ "ssl": {
+ "port": 6901,
+ "key": "fixtures/server.key",
+ "cert": "fixtures/server.crt"
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "pool": {"numWorkers": 1}
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ // should do nothing
+ s.(*Service).Stop()
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
+
+ time.Sleep(time.Millisecond * 500)
+
+ req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := sslClient.Do(req)
+ assert.NoError(t, err)
+
+ assert.Nil(t, r.TLS)
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+
+ err2 := r.Body.Close()
+ if err2 != nil {
+ t.Errorf("fail to close the Body: error %v", err2)
+ }
+ c.Stop()
+}
+
+func Test_SSL_Service_Redirect(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "address": ":6831",
+ "ssl": {
+ "port": 6902,
+ "redirect": true,
+ "key": "fixtures/server.key",
+ "cert": "fixtures/server.crt"
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "pool": {"numWorkers": 1}
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ // should do nothing
+ s.(*Service).Stop()
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
+
+ time.Sleep(time.Millisecond * 500)
+
+ req, err := http.NewRequest("GET", "http://localhost:6831?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := sslClient.Do(req)
+ assert.NoError(t, err)
+ assert.NotNil(t, r.TLS)
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+
+ err2 := r.Body.Close()
+ if err2 != nil {
+ t.Errorf("fail to close the Body: error %v", err2)
+ }
+ c.Stop()
+}
+
+func Test_SSL_Service_Push(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "address": ":6032",
+ "ssl": {
+ "port": 6903,
+ "redirect": true,
+ "key": "fixtures/server.key",
+ "cert": "fixtures/server.crt"
+ },
+ "workers":{
+ "command": "php ../../tests/http/client.php push pipes",
+ "pool": {"numWorkers": 1}
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, st)
+
+ // should do nothing
+ s.(*Service).Stop()
+
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 500)
+
+ req, err := http.NewRequest("GET", "https://localhost:6903?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := sslClient.Do(req)
+ assert.NoError(t, err)
+
+ assert.NotNil(t, r.TLS)
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.Equal(t, "", r.Header.Get("Http2-Push"))
+
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+
+
+ err2 := r.Body.Close()
+ if err2 != nil {
+ t.Errorf("fail to close the Body: error %v", err2)
+ }
+ c.Stop()
+}
diff --git a/plugins/http/uploads.go b/plugins/http/uploads.go
new file mode 100644
index 00000000..39a9eaf2
--- /dev/null
+++ b/plugins/http/uploads.go
@@ -0,0 +1,160 @@
+package http
+
+import (
+ "fmt"
+ json "github.com/json-iterator/go"
+ "github.com/sirupsen/logrus"
+ "io"
+ "io/ioutil"
+ "mime/multipart"
+ "os"
+ "sync"
+)
+
+const (
+ // UploadErrorOK - no error, the file uploaded with success.
+ UploadErrorOK = 0
+
+ // UploadErrorNoFile - no file was uploaded.
+ UploadErrorNoFile = 4
+
+ // UploadErrorNoTmpDir - missing a temporary folder.
+ UploadErrorNoTmpDir = 5
+
+ // UploadErrorCantWrite - failed to write file to disk.
+ UploadErrorCantWrite = 6
+
+ // UploadErrorExtension - forbidden file extension.
+ UploadErrorExtension = 7
+)
+
+// Uploads tree manages uploaded files tree and temporary files.
+type Uploads struct {
+ // associated temp directory and forbidden extensions.
+ cfg *UploadsConfig
+
+ // pre processed data tree for Uploads.
+ tree fileTree
+
+ // flat list of all file Uploads.
+ list []*FileUpload
+}
+
+// MarshalJSON marshal tree tree into JSON.
+func (u *Uploads) MarshalJSON() ([]byte, error) {
+ j := json.ConfigCompatibleWithStandardLibrary
+ return j.Marshal(u.tree)
+}
+
+// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
+// will be handled individually.
+func (u *Uploads) Open(log *logrus.Logger) {
+ var wg sync.WaitGroup
+ for _, f := range u.list {
+ wg.Add(1)
+ go func(f *FileUpload) {
+ defer wg.Done()
+ err := f.Open(u.cfg)
+ if err != nil && log != nil {
+ log.Error(fmt.Errorf("error opening the file: error %v", err))
+ }
+ }(f)
+ }
+
+ wg.Wait()
+}
+
+// Clear deletes all temporary files.
+func (u *Uploads) Clear(log *logrus.Logger) {
+ for _, f := range u.list {
+ if f.TempFilename != "" && exists(f.TempFilename) {
+ err := os.Remove(f.TempFilename)
+ if err != nil && log != nil {
+ log.Error(fmt.Errorf("error removing the file: error %v", err))
+ }
+ }
+ }
+}
+
+// FileUpload represents singular file NewUpload.
+type FileUpload struct {
+ // ID contains filename specified by the client.
+ Name string `json:"name"`
+
+ // Mime contains mime-type provided by the client.
+ Mime string `json:"mime"`
+
+ // Size of the uploaded file.
+ Size int64 `json:"size"`
+
+ // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php
+ Error int `json:"error"`
+
+ // TempFilename points to temporary file location.
+ TempFilename string `json:"tmpName"`
+
+ // associated file header
+ header *multipart.FileHeader
+}
+
+// NewUpload wraps net/http upload into PRS-7 compatible structure.
+func NewUpload(f *multipart.FileHeader) *FileUpload {
+ return &FileUpload{
+ Name: f.Filename,
+ Mime: f.Header.Get("Content-Type"),
+ Error: UploadErrorOK,
+ header: f,
+ }
+}
+
+// Open moves file content into temporary file available for PHP.
+// NOTE:
+// There is 2 deferred functions, and in case of getting 2 errors from both functions
+// error from close of temp file would be overwritten by error from the main file
+// STACK
+// DEFER FILE CLOSE (2)
+// DEFER TMP CLOSE (1)
+func (f *FileUpload) Open(cfg *UploadsConfig) (err error) {
+ if cfg.Forbids(f.Name) {
+ f.Error = UploadErrorExtension
+ return nil
+ }
+
+ file, err := f.header.Open()
+ if err != nil {
+ f.Error = UploadErrorNoFile
+ return err
+ }
+
+ defer func() {
+ // close the main file
+ err = file.Close()
+ }()
+
+ tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload")
+ if err != nil {
+ // most likely cause of this issue is missing tmp dir
+ f.Error = UploadErrorNoTmpDir
+ return err
+ }
+
+ f.TempFilename = tmp.Name()
+ defer func() {
+ // close the temp file
+ err = tmp.Close()
+ }()
+
+ if f.Size, err = io.Copy(tmp, file); err != nil {
+ f.Error = UploadErrorCantWrite
+ }
+
+ return err
+}
+
+// exists if file exists.
+func exists(path string) bool {
+ if _, err := os.Stat(path); os.IsNotExist(err) {
+ return false
+ }
+ return true
+}
diff --git a/plugins/http/uploads_config.go b/plugins/http/uploads_config.go
new file mode 100644
index 00000000..3f655064
--- /dev/null
+++ b/plugins/http/uploads_config.go
@@ -0,0 +1,45 @@
+package http
+
+import (
+ "os"
+ "path"
+ "strings"
+)
+
+// UploadsConfig describes file location and controls access to them.
+type UploadsConfig struct {
+ // Dir contains name of directory to control access to.
+ Dir string
+
+ // Forbid specifies list of file extensions which are forbidden for access.
+ // Example: .php, .exe, .bat, .htaccess and etc.
+ Forbid []string
+}
+
+// InitDefaults sets missing values to their default values.
+func (cfg *UploadsConfig) InitDefaults() error {
+ cfg.Forbid = []string{".php", ".exe", ".bat"}
+ return nil
+}
+
+// TmpDir returns temporary directory.
+func (cfg *UploadsConfig) TmpDir() string {
+ if cfg.Dir != "" {
+ return cfg.Dir
+ }
+
+ return os.TempDir()
+}
+
+// Forbids must return true if file extension is not allowed for the upload.
+func (cfg *UploadsConfig) Forbids(filename string) bool {
+ ext := strings.ToLower(path.Ext(filename))
+
+ for _, v := range cfg.Forbid {
+ if ext == v {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/plugins/http/uploads_config_test.go b/plugins/http/uploads_config_test.go
new file mode 100644
index 00000000..2b6ceebc
--- /dev/null
+++ b/plugins/http/uploads_config_test.go
@@ -0,0 +1,24 @@
+package http
+
+import (
+ "github.com/stretchr/testify/assert"
+ "os"
+ "testing"
+)
+
+func TestFsConfig_Forbids(t *testing.T) {
+ cfg := UploadsConfig{Forbid: []string{".php"}}
+
+ assert.True(t, cfg.Forbids("index.php"))
+ assert.True(t, cfg.Forbids("index.PHP"))
+ assert.True(t, cfg.Forbids("phpadmin/index.bak.php"))
+ assert.False(t, cfg.Forbids("index.html"))
+}
+
+func TestFsConfig_TmpFallback(t *testing.T) {
+ cfg := UploadsConfig{Dir: "test"}
+ assert.Equal(t, "test", cfg.TmpDir())
+
+ cfg = UploadsConfig{Dir: ""}
+ assert.Equal(t, os.TempDir(), cfg.TmpDir())
+}
diff --git a/plugins/http/uploads_test.go b/plugins/http/uploads_test.go
new file mode 100644
index 00000000..08177c72
--- /dev/null
+++ b/plugins/http/uploads_test.go
@@ -0,0 +1,434 @@
+package http
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "encoding/hex"
+ "fmt"
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner"
+ "github.com/stretchr/testify/assert"
+ "io"
+ "io/ioutil"
+ "mime/multipart"
+ "net/http"
+ "os"
+ "testing"
+ "time"
+)
+
+func TestHandler_Upload_File(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php upload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8021", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+
+ f := mustOpen("uploads_test.go")
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ t.Errorf("failed to close a file: error %v", err)
+ }
+ }()
+ fw, err := w.CreateFormFile("upload", f.Name())
+ assert.NotNil(t, fw)
+ assert.NoError(t, err)
+ _, err = io.Copy(fw, f)
+ if err != nil {
+ t.Errorf("error copying the file: error %v", err)
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the file: error %v", err)
+ }
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ fs := fileString("uploads_test.go", 0, "application/octet-stream")
+
+ assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+}
+
+func TestHandler_Upload_NestedFile(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php upload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8021", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+
+ f := mustOpen("uploads_test.go")
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ t.Errorf("failed to close a file: error %v", err)
+ }
+ }()
+ fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name())
+ assert.NotNil(t, fw)
+ assert.NoError(t, err)
+ _, err = io.Copy(fw, f)
+ if err != nil {
+ t.Errorf("error copying the file: error %v", err)
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the file: error %v", err)
+ }
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ fs := fileString("uploads_test.go", 0, "application/octet-stream")
+
+ assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b))
+}
+
+func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: "-----",
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php upload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8021", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+
+ f := mustOpen("uploads_test.go")
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ t.Errorf("failed to close a file: error %v", err)
+ }
+ }()
+ fw, err := w.CreateFormFile("upload", f.Name())
+ assert.NotNil(t, fw)
+ assert.NoError(t, err)
+ _, err = io.Copy(fw, f)
+ if err != nil {
+ t.Errorf("error copying the file: error %v", err)
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the file: error %v", err)
+ }
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ fs := fileString("uploads_test.go", 5, "application/octet-stream")
+
+ assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+}
+
+func TestHandler_Upload_File_Forbids(t *testing.T) {
+ h := &Handler{
+ cfg: &Config{
+ MaxRequestSize: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{".go"},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php upload pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8021", Handler: h}
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ var mb bytes.Buffer
+ w := multipart.NewWriter(&mb)
+
+ f := mustOpen("uploads_test.go")
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ t.Errorf("failed to close a file: error %v", err)
+ }
+ }()
+ fw, err := w.CreateFormFile("upload", f.Name())
+ assert.NotNil(t, fw)
+ assert.NoError(t, err)
+ _, err = io.Copy(fw, f)
+ if err != nil {
+ t.Errorf("error copying the file: error %v", err)
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the file: error %v", err)
+ }
+
+ req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+ assert.NoError(t, err)
+
+ req.Header.Set("Content-Type", w.FormDataContentType())
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+ }()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ fs := fileString("uploads_test.go", 7, "application/octet-stream")
+
+ assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+}
+
+func Test_FileExists(t *testing.T) {
+ assert.True(t, exists("uploads_test.go"))
+ assert.False(t, exists("uploads_test."))
+}
+
+func mustOpen(f string) *os.File {
+ r, err := os.Open(f)
+ if err != nil {
+ panic(err)
+ }
+ return r
+}
+
+type fInfo struct {
+ Name string `json:"name"`
+ Size int64 `json:"size"`
+ Mime string `json:"mime"`
+ Error int `json:"error"`
+ MD5 string `json:"md5,omitempty"`
+}
+
+func fileString(f string, errNo int, mime string) string {
+ s, err := os.Stat(f)
+ if err != nil {
+ fmt.Println(fmt.Errorf("error stat the file, error: %v", err))
+ }
+
+ ff, err := os.Open(f)
+ if err != nil {
+ fmt.Println(fmt.Errorf("error opening the file, error: %v", err))
+ }
+
+ defer func() {
+ er := ff.Close()
+ if er != nil {
+ fmt.Println(fmt.Errorf("error closing the file, error: %v", er))
+ }
+ }()
+
+ h := md5.New()
+ _, err = io.Copy(h, ff)
+ if err != nil {
+ fmt.Println(fmt.Errorf("error copying the file, error: %v", err))
+ }
+
+ v := &fInfo{
+ Name: s.Name(),
+ Size: s.Size(),
+ Error: errNo,
+ Mime: mime,
+ MD5: hex.EncodeToString(h.Sum(nil)),
+ }
+
+ if errNo != 0 {
+ v.MD5 = ""
+ v.Size = 0
+ }
+
+ j := json.ConfigCompatibleWithStandardLibrary
+ r, err := j.Marshal(v)
+ if err != nil {
+ fmt.Println(fmt.Errorf("error marshalling fInfo, error: %v", err))
+ }
+ return string(r)
+
+}