diff options
author | Valery Piashchynski <[email protected]> | 2020-11-15 12:55:12 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-11-15 12:55:12 +0300 |
commit | 5260639a7e95d4d4a36b46fd4b19f665a0a60ef9 (patch) | |
tree | 31b2a616b385eb2e207b1f6e0b1269d37f95af75 | |
parent | a7ba4df83b4f2c67a3a0fb9d1dd35663935c90be (diff) |
Initial commit of http 2.0 plugin
27 files changed, 6197 insertions, 0 deletions
diff --git a/plugins/http/attributes/attributes.go b/plugins/http/attributes/attributes.go new file mode 100644 index 00000000..77d6ea69 --- /dev/null +++ b/plugins/http/attributes/attributes.go @@ -0,0 +1,76 @@ +package attributes + +import ( + "context" + "errors" + "net/http" +) + +type attrKey int + +const contextKey attrKey = iota + +type attrs map[string]interface{} + +func (v attrs) get(key string) interface{} { + if v == nil { + return "" + } + + return v[key] +} + +func (v attrs) set(key string, value interface{}) { + v[key] = value +} + +func (v attrs) del(key string) { + delete(v, key) +} + +// Init returns request with new context and attribute bag. +func Init(r *http.Request) *http.Request { + return r.WithContext(context.WithValue(r.Context(), contextKey, attrs{})) +} + +// All returns all context attributes. +func All(r *http.Request) map[string]interface{} { + v := r.Context().Value(contextKey) + if v == nil { + return attrs{} + } + + return v.(attrs) +} + +// Get gets the value from request context. It replaces any existing +// values. +func Get(r *http.Request, key string) interface{} { + v := r.Context().Value(contextKey) + if v == nil { + return nil + } + + return v.(attrs).get(key) +} + +// Set sets the key to value. It replaces any existing +// values. Context specific. +func Set(r *http.Request, key string, value interface{}) error { + v := r.Context().Value(contextKey) + if v == nil { + return errors.New("unable to find `psr:attributes` context key") + } + + v.(attrs).set(key, value) + return nil +} + +// Delete deletes values associated with attribute key. +func (v attrs) Delete(key string) { + if v == nil { + return + } + + v.del(key) +} diff --git a/plugins/http/attributes/attributes_test.go b/plugins/http/attributes/attributes_test.go new file mode 100644 index 00000000..2360fd12 --- /dev/null +++ b/plugins/http/attributes/attributes_test.go @@ -0,0 +1,79 @@ +package attributes + +import ( + "github.com/stretchr/testify/assert" + "net/http" + "testing" +) + +func TestAllAttributes(t *testing.T) { + r := &http.Request{} + r = Init(r) + + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } + + assert.Equal(t, All(r), map[string]interface{}{ + "key": "value", + }) +} + +func TestAllAttributesNone(t *testing.T) { + r := &http.Request{} + r = Init(r) + + assert.Equal(t, All(r), map[string]interface{}{}) +} + +func TestAllAttributesNone2(t *testing.T) { + r := &http.Request{} + + assert.Equal(t, All(r), map[string]interface{}{}) +} + +func TestGetAttribute(t *testing.T) { + r := &http.Request{} + r = Init(r) + + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } + assert.Equal(t, Get(r, "key"), "value") +} + +func TestGetAttributeNone(t *testing.T) { + r := &http.Request{} + r = Init(r) + + assert.Equal(t, Get(r, "key"), nil) +} + +func TestGetAttributeNone2(t *testing.T) { + r := &http.Request{} + + assert.Equal(t, Get(r, "key"), nil) +} + +func TestSetAttribute(t *testing.T) { + r := &http.Request{} + r = Init(r) + + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } + assert.Equal(t, Get(r, "key"), "value") +} + +func TestSetAttributeNone(t *testing.T) { + r := &http.Request{} + + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } + assert.Equal(t, Get(r, "key"), nil) +} diff --git a/plugins/http/config.go b/plugins/http/config.go new file mode 100644 index 00000000..00f61652 --- /dev/null +++ b/plugins/http/config.go @@ -0,0 +1,263 @@ +package http + +import ( + "errors" + "fmt" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" + "net" + "os" + "strings" +) + +// Config configures RoadRunner HTTP server. +type Config struct { + // Port and port to handle as http server. + Address string + + // SSL defines https server options. + SSL SSLConfig + + // FCGI configuration. You can use FastCGI without HTTP server. + FCGI *FCGIConfig + + // HTTP2 configuration + HTTP2 *HTTP2Config + + // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited. + MaxRequestSize int64 + + // TrustedSubnets declare IP subnets which are allowed to set ip using X-Real-Ip and X-Forwarded-For + TrustedSubnets []string + cidrs []*net.IPNet + + // Uploads configures uploads configuration. + Uploads *UploadsConfig + + // Workers configures rr server and worker pool. + Workers *roadrunner.ServerConfig +} + +// FCGIConfig for FastCGI server. +type FCGIConfig struct { + // Address and port to handle as http server. + Address string +} + +// HTTP2Config HTTP/2 server customizations. +type HTTP2Config struct { + // Enable or disable HTTP/2 extension, default enable. + Enabled bool + + // H2C enables HTTP/2 over TCP + H2C bool + + // MaxConcurrentStreams defaults to 128. + MaxConcurrentStreams uint32 +} + +// InitDefaults sets default values for HTTP/2 configuration. +func (cfg *HTTP2Config) InitDefaults() error { + cfg.Enabled = true + cfg.MaxConcurrentStreams = 128 + + return nil +} + +// SSLConfig defines https server configuration. +type SSLConfig struct { + // Port to listen as HTTPS server, defaults to 443. + Port int + + // Redirect when enabled forces all http connections to switch to https. + Redirect bool + + // Key defined private server key. + Key string + + // Cert is https certificate. + Cert string + + // Root CA file + RootCA string +} + +// EnableHTTP is true when http server must run. +func (c *Config) EnableHTTP() bool { + return c.Address != "" +} + +// EnableTLS returns true if rr must listen TLS connections. +func (c *Config) EnableTLS() bool { + return c.SSL.Key != "" || c.SSL.Cert != "" || c.SSL.RootCA != "" +} + +// EnableHTTP2 when HTTP/2 extension must be enabled (only with TSL). +func (c *Config) EnableHTTP2() bool { + return c.HTTP2.Enabled +} + +// EnableH2C when HTTP/2 extension must be enabled on TCP. +func (c *Config) EnableH2C() bool { + return c.HTTP2.H2C +} + +// EnableFCGI is true when FastCGI server must be enabled. +func (c *Config) EnableFCGI() bool { + return c.FCGI.Address != "" +} + +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if c.Workers == nil { + c.Workers = &roadrunner.ServerConfig{} + } + + if c.HTTP2 == nil { + c.HTTP2 = &HTTP2Config{} + } + + if c.FCGI == nil { + c.FCGI = &FCGIConfig{} + } + + if c.Uploads == nil { + c.Uploads = &UploadsConfig{} + } + + if c.SSL.Port == 0 { + c.SSL.Port = 443 + } + + err := c.HTTP2.InitDefaults() + if err != nil { + return err + } + err = c.Uploads.InitDefaults() + if err != nil { + return err + } + err = c.Workers.InitDefaults() + if err != nil { + return err + } + + if err := cfg.Unmarshal(c); err != nil { + return err + } + + c.Workers.UpscaleDurations() + + if c.TrustedSubnets == nil { + // @see https://en.wikipedia.org/wiki/Reserved_IP_addresses + c.TrustedSubnets = []string{ + "10.0.0.0/8", + "127.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "::1/128", + "fc00::/7", + "fe80::/10", + } + } + + if err := c.parseCIDRs(); err != nil { + return err + } + + return c.Valid() +} + +func (c *Config) parseCIDRs() error { + for _, cidr := range c.TrustedSubnets { + _, cr, err := net.ParseCIDR(cidr) + if err != nil { + return err + } + + c.cidrs = append(c.cidrs, cr) + } + + return nil +} + +// IsTrusted if api can be trusted to use X-Real-Ip, X-Forwarded-For +func (c *Config) IsTrusted(ip string) bool { + if c.cidrs == nil { + return false + } + + i := net.ParseIP(ip) + if i == nil { + return false + } + + for _, cird := range c.cidrs { + if cird.Contains(i) { + return true + } + } + + return false +} + +// Valid validates the configuration. +func (c *Config) Valid() error { + if c.Uploads == nil { + return errors.New("malformed uploads config") + } + + if c.HTTP2 == nil { + return errors.New("malformed http2 config") + } + + if c.Workers == nil { + return errors.New("malformed workers config") + } + + if c.Workers.Pool == nil { + return errors.New("malformed workers config (pool config is missing)") + } + + if err := c.Workers.Pool.Valid(); err != nil { + return err + } + + if !c.EnableHTTP() && !c.EnableTLS() && !c.EnableFCGI() { + return errors.New("unable to run http service, no method has been specified (http, https, http/2 or FastCGI)") + } + + if c.Address != "" && !strings.Contains(c.Address, ":") { + return errors.New("malformed http server address") + } + + if c.EnableTLS() { + if _, err := os.Stat(c.SSL.Key); err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("key file '%s' does not exists", c.SSL.Key) + } + + return err + } + + if _, err := os.Stat(c.SSL.Cert); err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("cert file '%s' does not exists", c.SSL.Cert) + } + + return err + } + + // RootCA is optional, but if provided - check it + if c.SSL.RootCA != "" { + if _, err := os.Stat(c.SSL.RootCA); err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("root ca path provided, but path '%s' does not exists", c.SSL.RootCA) + } + return err + } + } + } + + return nil +} diff --git a/plugins/http/config_test.go b/plugins/http/config_test.go new file mode 100644 index 00000000..d95e0995 --- /dev/null +++ b/plugins/http/config_test.go @@ -0,0 +1,331 @@ +package http + +import ( + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "os" + "testing" + "time" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { + j := json.ConfigCompatibleWithStandardLibrary + return j.Unmarshal([]byte(cfg.cfg), out) +} + +func Test_Config_Hydrate_Error1(t *testing.T) { + cfg := &mockCfg{`{"address": "localhost:8080"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"dir": "/dir/"`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Valid(t *testing.T) { + cfg := &Config{ + Address: ":8080", + MaxRequestSize: 1024, + HTTP2: &HTTP2Config{ + Enabled: true, + }, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + Workers: &roadrunner.ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.NoError(t, cfg.Valid()) +} + +func Test_Trusted_Subnets(t *testing.T) { + cfg := &Config{ + Address: ":8080", + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + HTTP2: &HTTP2Config{ + Enabled: true, + }, + TrustedSubnets: []string{"200.1.0.0/16"}, + Workers: &roadrunner.ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.NoError(t, cfg.parseCIDRs()) + + assert.True(t, cfg.IsTrusted("200.1.0.10")) + assert.False(t, cfg.IsTrusted("127.0.0.0.1")) +} + +func Test_Trusted_Subnets_Err(t *testing.T) { + cfg := &Config{ + Address: ":8080", + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + HTTP2: &HTTP2Config{ + Enabled: true, + }, + TrustedSubnets: []string{"200.1.0.0"}, + Workers: &roadrunner.ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.Error(t, cfg.parseCIDRs()) +} + +func Test_Config_Valid_SSL(t *testing.T) { + cfg := &Config{ + Address: ":8080", + SSL: SSLConfig{ + Cert: "fixtures/server.crt", + Key: "fixtures/server.key", + }, + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + HTTP2: &HTTP2Config{ + Enabled: true, + }, + Workers: &roadrunner.ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.Error(t, cfg.Hydrate(&testCfg{httpCfg: "{}"})) + + assert.NoError(t, cfg.Valid()) + assert.True(t, cfg.EnableTLS()) + assert.Equal(t, 443, cfg.SSL.Port) +} + +func Test_Config_SSL_No_key(t *testing.T) { + cfg := &Config{ + Address: ":8080", + SSL: SSLConfig{ + Cert: "fixtures/server.crt", + }, + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + HTTP2: &HTTP2Config{ + Enabled: true, + }, + Workers: &roadrunner.ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.Error(t, cfg.Valid()) +} + +func Test_Config_SSL_No_Cert(t *testing.T) { + cfg := &Config{ + Address: ":8080", + SSL: SSLConfig{ + Key: "fixtures/server.key", + }, + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + HTTP2: &HTTP2Config{ + Enabled: true, + }, + Workers: &roadrunner.ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.Error(t, cfg.Valid()) +} + +func Test_Config_NoUploads(t *testing.T) { + cfg := &Config{ + Address: ":8080", + MaxRequestSize: 1024, + HTTP2: &HTTP2Config{ + Enabled: true, + }, + Workers: &roadrunner.ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.Error(t, cfg.Valid()) +} + +func Test_Config_NoHTTP2(t *testing.T) { + cfg := &Config{ + Address: ":8080", + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + Workers: &roadrunner.ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 0, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.Error(t, cfg.Valid()) +} + +func Test_Config_NoWorkers(t *testing.T) { + cfg := &Config{ + Address: ":8080", + MaxRequestSize: 1024, + HTTP2: &HTTP2Config{ + Enabled: true, + }, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + } + + assert.Error(t, cfg.Valid()) +} + +func Test_Config_NoPool(t *testing.T) { + cfg := &Config{ + Address: ":8080", + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + HTTP2: &HTTP2Config{ + Enabled: true, + }, + Workers: &roadrunner.ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 0, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.Error(t, cfg.Valid()) +} + +func Test_Config_DeadPool(t *testing.T) { + cfg := &Config{ + Address: ":8080", + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + HTTP2: &HTTP2Config{ + Enabled: true, + }, + Workers: &roadrunner.ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + }, + } + + assert.Error(t, cfg.Valid()) +} + +func Test_Config_InvalidAddress(t *testing.T) { + cfg := &Config{ + Address: "unexpected_address", + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + HTTP2: &HTTP2Config{ + Enabled: true, + }, + Workers: &roadrunner.ServerConfig{ + Command: "php tests/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + }, + } + + assert.Error(t, cfg.Valid()) +} diff --git a/plugins/http/constants.go b/plugins/http/constants.go new file mode 100644 index 00000000..a25f52a4 --- /dev/null +++ b/plugins/http/constants.go @@ -0,0 +1,6 @@ +package http + +import "net/http" + +var http2pushHeaderKey = http.CanonicalHeaderKey("http2-push") +var trailerHeaderKey = http.CanonicalHeaderKey("trailer") diff --git a/plugins/http/errors.go b/plugins/http/errors.go new file mode 100644 index 00000000..fb8762ef --- /dev/null +++ b/plugins/http/errors.go @@ -0,0 +1,25 @@ +// +build !windows + +package http + +import ( + "errors" + "net" + "os" + "syscall" +) + +// Broken pipe +var errEPIPE = errors.New("EPIPE(32) -> connection reset by peer") + +// handleWriteError just check if error was caused by aborted connection on linux +func handleWriteError(err error) error { + if netErr, ok2 := err.(*net.OpError); ok2 { + if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { + if syscallErr.Err == syscall.EPIPE { + return errEPIPE + } + } + } + return err +} diff --git a/plugins/http/errors_windows.go b/plugins/http/errors_windows.go new file mode 100644 index 00000000..3d0ba04c --- /dev/null +++ b/plugins/http/errors_windows.go @@ -0,0 +1,27 @@ +// +build windows + +package http + +import ( + "errors" + "net" + "os" + "syscall" +) + +//Software caused connection abort. +//An established connection was aborted by the software in your host computer, +//possibly due to a data transmission time-out or protocol error. +var errEPIPE = errors.New("WSAECONNABORTED (10053) -> an established connection was aborted by peer") + +// handleWriteError just check if error was caused by aborted connection on windows +func handleWriteError(err error) error { + if netErr, ok2 := err.(*net.OpError); ok2 { + if syscallErr, ok3 := netErr.Err.(*os.SyscallError); ok3 { + if syscallErr.Err == syscall.WSAECONNABORTED { + return errEPIPE + } + } + } + return err +} diff --git a/plugins/http/fcgi_test.go b/plugins/http/fcgi_test.go new file mode 100644 index 00000000..e68b2e7f --- /dev/null +++ b/plugins/http/fcgi_test.go @@ -0,0 +1,105 @@ +package http + +import ( + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "github.com/yookoala/gofast" + "io/ioutil" + "net/http/httptest" + "testing" + "time" +) + +func Test_FCGI_Service_Echo(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "fcgi": { + "address": "tcp://0.0.0.0:6082" + }, + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + // should do nothing + s.(*Service).Stop() + + go func() { assert.NoError(t, c.Serve()) }() + time.Sleep(time.Second * 1) + + fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6082") + + fcgiHandler := gofast.NewHandler( + gofast.BasicParamsMap(gofast.BasicSession), + gofast.SimpleClientFactory(fcgiConnFactory, 0), + ) + + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", "http://site.local/?hello=world", nil) + fcgiHandler.ServeHTTP(w, req) + + body, err := ioutil.ReadAll(w.Result().Body) + + assert.NoError(t, err) + assert.Equal(t, 201, w.Result().StatusCode) + assert.Equal(t, "WORLD", string(body)) + c.Stop() +} + +func Test_FCGI_Service_Request_Uri(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "fcgi": { + "address": "tcp://0.0.0.0:6083" + }, + "workers":{ + "command": "php ../../tests/http/client.php request-uri pipes", + "pool": {"numWorkers": 1} + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + // should do nothing + s.(*Service).Stop() + + go func() { assert.NoError(t, c.Serve()) }() + time.Sleep(time.Second * 1) + + fcgiConnFactory := gofast.SimpleConnFactory("tcp", "0.0.0.0:6083") + + fcgiHandler := gofast.NewHandler( + gofast.BasicParamsMap(gofast.BasicSession), + gofast.SimpleClientFactory(fcgiConnFactory, 0), + ) + + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", "http://site.local/hello-world", nil) + fcgiHandler.ServeHTTP(w, req) + + body, err := ioutil.ReadAll(w.Result().Body) + + assert.NoError(t, err) + assert.Equal(t, 200, w.Result().StatusCode) + assert.Equal(t, "http://site.local/hello-world", string(body)) + c.Stop() +} diff --git a/plugins/http/fixtures/server.crt b/plugins/http/fixtures/server.crt new file mode 100644 index 00000000..24d67fd7 --- /dev/null +++ b/plugins/http/fixtures/server.crt @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICTTCCAdOgAwIBAgIJAOKyUd+llTRKMAoGCCqGSM49BAMCMGMxCzAJBgNVBAYT +AlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2Nv +MRMwEQYDVQQKDApSb2FkUnVubmVyMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMTgw +OTMwMTMzNDUzWhcNMjgwOTI3MTMzNDUzWjBjMQswCQYDVQQGEwJVUzETMBEGA1UE +CAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzETMBEGA1UECgwK +Um9hZFJ1bm5lcjESMBAGA1UEAwwJbG9jYWxob3N0MHYwEAYHKoZIzj0CAQYFK4EE +ACIDYgAEVnbShsM+l5RR3wfWWmGhzuFGwNzKCk7i9xyobDIyBUxG/UUSfj7KKlUX +puDnDEtF5xXcepl744CyIAYFLOXHb5WqI4jCOzG0o9f/00QQ4bQudJOdbqV910QF +C2vb7Fxro1MwUTAdBgNVHQ4EFgQU9xUexnbB6ORKayA7Pfjzs33otsAwHwYDVR0j +BBgwFoAU9xUexnbB6ORKayA7Pfjzs33otsAwDwYDVR0TAQH/BAUwAwEB/zAKBggq +hkjOPQQDAgNoADBlAjEAue3HhR/MUhxoa9tSDBtOJT3FYbDQswrsdqBTz97CGKst +e7XeZ3HMEvEXy0hGGEMhAjAqcD/4k9vViVppgWFtkk6+NFbm+Kw/QeeAiH5FgFSj +8xQcb+b7nPwNLp3JOkXkVd4= +-----END CERTIFICATE----- diff --git a/plugins/http/fixtures/server.key b/plugins/http/fixtures/server.key new file mode 100644 index 00000000..7501dd46 --- /dev/null +++ b/plugins/http/fixtures/server.key @@ -0,0 +1,9 @@ +-----BEGIN EC PARAMETERS----- +BgUrgQQAIg== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MIGkAgEBBDCQP8utxNbHR6xZOLAJgUhn88r6IrPqmN0MsgGJM/jePB+T9UhkmIU8 +PMm2HeScbcugBwYFK4EEACKhZANiAARWdtKGwz6XlFHfB9ZaYaHO4UbA3MoKTuL3 +HKhsMjIFTEb9RRJ+PsoqVRem4OcMS0XnFdx6mXvjgLIgBgUs5cdvlaojiMI7MbSj +1//TRBDhtC50k51upX3XRAULa9vsXGs= +-----END EC PRIVATE KEY----- diff --git a/plugins/http/h2c_test.go b/plugins/http/h2c_test.go new file mode 100644 index 00000000..f17538bc --- /dev/null +++ b/plugins/http/h2c_test.go @@ -0,0 +1,83 @@ +package http + +import ( + "net/http" + "testing" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" +) + +func Test_Service_H2C(t *testing.T) { + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ + "address": ":6029", + "http2": {"h2c":true}, + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1 + } + } + }`}) + if err != nil { + return err + } + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + // should do nothing + s.(*Service).Stop() + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error serving: %v", err) + } + }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + req, err := http.NewRequest("PRI", "http://localhost:6029?hello=world", nil) + if err != nil { + return err + } + + req.Header.Add("Upgrade", "h2c") + req.Header.Add("Connection", "HTTP2-Settings") + req.Header.Add("HTTP2-Settings", "") + + r, err2 := http.DefaultClient.Do(req) + if err2 != nil { + return err2 + } + + assert.Equal(t, "101 Switching Protocols", r.Status) + + err3 := r.Body.Close() + if err3 != nil { + return err3 + } + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } +} diff --git a/plugins/http/handler.go b/plugins/http/handler.go new file mode 100644 index 00000000..eca05483 --- /dev/null +++ b/plugins/http/handler.go @@ -0,0 +1,208 @@ +package http + +import ( + "fmt" + "net" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner" +) + +const ( + // EventResponse thrown after the request been processed. See ErrorEvent as payload. + EventResponse = iota + 500 + + // EventError thrown on any non job error provided by road runner server. + EventError +) + +// ErrorEvent represents singular http error event. +type ErrorEvent struct { + // Request contains client request, must not be stored. + Request *http.Request + + // Error - associated error, if any. + Error error + + // event timings + start time.Time + elapsed time.Duration +} + +// Elapsed returns duration of the invocation. +func (e *ErrorEvent) Elapsed() time.Duration { + return e.elapsed +} + +// ResponseEvent represents singular http response event. +type ResponseEvent struct { + // Request contains client request, must not be stored. + Request *Request + + // Response contains service response. + Response *Response + + // event timings + start time.Time + elapsed time.Duration +} + +// Elapsed returns duration of the invocation. +func (e *ResponseEvent) Elapsed() time.Duration { + return e.elapsed +} + +// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, +// parsed files and query, payload will include parsed form dataTree (if any). +type Handler struct { + cfg *Config + log *logrus.Logger + rr *roadrunner.Server + mul sync.Mutex + lsn func(event int, ctx interface{}) +} + +// Listen attaches handler event controller. +func (h *Handler) Listen(l func(event int, ctx interface{})) { + h.mul.Lock() + defer h.mul.Unlock() + + h.lsn = l +} + +// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + start := time.Now() + + // validating request size + if h.cfg.MaxRequestSize != 0 { + if length := r.Header.Get("content-length"); length != "" { + if size, err := strconv.ParseInt(length, 10, 64); err != nil { + h.handleError(w, r, err, start) + return + } else if size > h.cfg.MaxRequestSize*1024*1024 { + h.handleError(w, r, errors.New("request body max size is exceeded"), start) + return + } + } + } + + req, err := NewRequest(r, h.cfg.Uploads) + if err != nil { + h.handleError(w, r, err, start) + return + } + + // proxy IP resolution + h.resolveIP(req) + + req.Open(h.log) + defer req.Close(h.log) + + p, err := req.Payload() + if err != nil { + h.handleError(w, r, err, start) + return + } + + rsp, err := h.rr.Exec(p) + if err != nil { + h.handleError(w, r, err, start) + return + } + + resp, err := NewResponse(rsp) + if err != nil { + h.handleError(w, r, err, start) + return + } + + h.handleResponse(req, resp, start) + err = resp.Write(w) + if err != nil { + h.handleError(w, r, err, start) + } +} + +// handleError sends error. +func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error, start time.Time) { + // if pipe is broken, there is no sense to write the header + // in this case we just report about error + if err == errEPIPE { + h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) + return + } + // ResponseWriter is ok, write the error code + w.WriteHeader(500) + _, err2 := w.Write([]byte(err.Error())) + // error during the writing to the ResponseWriter + if err2 != nil { + // concat original error with ResponseWriter error + h.throw(EventError, &ErrorEvent{Request: r, Error: errors.New(fmt.Sprintf("error: %v, during handle this error, ResponseWriter error occurred: %v", err, err2)), start: start, elapsed: time.Since(start)}) + return + } + h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) +} + +// handleResponse triggers response event. +func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) { + h.throw(EventResponse, &ResponseEvent{Request: req, Response: resp, start: start, elapsed: time.Since(start)}) +} + +// throw invokes event handler if any. +func (h *Handler) throw(event int, ctx interface{}) { + h.mul.Lock() + defer h.mul.Unlock() + + if h.lsn != nil { + h.lsn(event, ctx) + } +} + +// get real ip passing multiple proxy +func (h *Handler) resolveIP(r *Request) { + if !h.cfg.IsTrusted(r.RemoteAddr) { + return + } + + if r.Header.Get("X-Forwarded-For") != "" { + ips := strings.Split(r.Header.Get("X-Forwarded-For"), ",") + ipCount := len(ips) + + for i := ipCount - 1; i >= 0; i-- { + addr := strings.TrimSpace(ips[i]) + if net.ParseIP(addr) != nil { + r.RemoteAddr = addr + return + } + } + + return + } + + // The logic here is the following: + // In general case, we only expect X-Real-Ip header. If it exist, we get the IP addres from header and set request Remote address + // But, if there is no X-Real-Ip header, we also trying to check CloudFlare headers + // True-Client-IP is a general CF header in which copied information from X-Real-Ip in CF. + // CF-Connecting-IP is an Enterprise feature and we check it last in order. + // This operations are near O(1) because Headers struct are the map type -> type MIMEHeader map[string][]string + if r.Header.Get("X-Real-Ip") != "" { + r.RemoteAddr = fetchIP(r.Header.Get("X-Real-Ip")) + return + } + + if r.Header.Get("True-Client-IP") != "" { + r.RemoteAddr = fetchIP(r.Header.Get("True-Client-IP")) + return + } + + if r.Header.Get("CF-Connecting-IP") != "" { + r.RemoteAddr = fetchIP(r.Header.Get("CF-Connecting-IP")) + } +} diff --git a/plugins/http/handler_test.go b/plugins/http/handler_test.go new file mode 100644 index 00000000..cb1cd728 --- /dev/null +++ b/plugins/http/handler_test.go @@ -0,0 +1,1961 @@ +package http + +import ( + "bytes" + "context" + "github.com/spiral/roadrunner" + "github.com/stretchr/testify/assert" + "io/ioutil" + "mime/multipart" + "net/http" + "net/http/httptest" + "net/url" + "os" + "runtime" + "strings" + "testing" + "time" +) + +// get request and return body +func get(url string) (string, *http.Response, error) { + r, err := http.Get(url) + if err != nil { + return "", nil, err + } + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", nil, err + } + + err = r.Body.Close() + if err != nil { + return "", nil, err + } + return string(b), r, err +} + +// get request and return body +func getHeader(url string, h map[string]string) (string, *http.Response, error) { + req, err := http.NewRequest("GET", url, bytes.NewBuffer(nil)) + if err != nil { + return "", nil, err + } + + for k, v := range h { + req.Header.Set(k, v) + } + + r, err := http.DefaultClient.Do(req) + if err != nil { + return "", nil, err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", nil, err + } + + err = r.Body.Close() + if err != nil { + return "", nil, err + } + return string(b), r, err +} + +func TestHandler_Echo(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + body, r, err := get("http://localhost:8177/?hello=world") + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", body) +} + +func Test_HandlerErrors(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + wr := httptest.NewRecorder() + rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("data"))) + + h.ServeHTTP(wr, rq) + assert.Equal(t, 500, wr.Code) +} + +func Test_Handler_JSON_error(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + wr := httptest.NewRecorder() + rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("{sd"))) + rq.Header.Add("Content-Type", "application/json") + rq.Header.Add("Content-Size", "3") + + h.ServeHTTP(wr, rq) + assert.Equal(t, 500, wr.Code) +} + +func TestHandler_Headers(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php header pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8078", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 100) + + req, err := http.NewRequest("GET", "http://localhost:8078?hello=world", nil) + assert.NoError(t, err) + + req.Header.Add("input", "sample") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "world", r.Header.Get("Header")) + assert.Equal(t, "SAMPLE", string(b)) +} + +func TestHandler_Empty_User_Agent(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php user-agent pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8088", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) + assert.NoError(t, err) + + req.Header.Add("user-agent", "") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "", string(b)) +} + +func TestHandler_User_Agent(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php user-agent pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8088", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) + assert.NoError(t, err) + + req.Header.Add("User-Agent", "go-agent") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "go-agent", string(b)) +} + +func TestHandler_Cookies(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php cookie pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8079", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + req, err := http.NewRequest("GET", "http://localhost:8079", nil) + assert.NoError(t, err) + + req.AddCookie(&http.Cookie{Name: "input", Value: "input-value"}) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "INPUT-VALUE", string(b)) + + for _, c := range r.Cookies() { + assert.Equal(t, "output", c.Name) + assert.Equal(t, "cookie-output", c.Value) + } +} + +func TestHandler_JsonPayload_POST(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php payload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8090", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + req, err := http.NewRequest( + "POST", + "http://localhost"+hs.Addr, + bytes.NewBufferString(`{"key":"value"}`), + ) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/json") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, `{"value":"key"}`, string(b)) +} + +func TestHandler_JsonPayload_PUT(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php payload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8081", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/json") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, `{"value":"key"}`, string(b)) +} + +func TestHandler_JsonPayload_PATCH(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php payload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8082", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/json") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, `{"value":"key"}`, string(b)) +} + +func TestHandler_FormData_POST(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8083", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 500) + + form := url.Values{} + + form.Add("key", "value") + form.Add("name[]", "name1") + form.Add("name[]", "name2") + form.Add("name[]", "name3") + form.Add("arr[x][y][z]", "y") + form.Add("arr[x][y][e]", "f") + form.Add("arr[c]p", "l") + form.Add("arr[c]z", "") + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestHandler_FormData_POST_Overwrite(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8083", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + form := url.Values{} + + form.Add("key", "value1") + form.Add("key", "value2") + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"key":"value2","arr":{"x":{"y":null}}}`, string(b)) +} + +func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8083", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + form := url.Values{} + + form.Add("key", "value") + form.Add("name[]", "name1") + form.Add("name[]", "name2") + form.Add("name[]", "name3") + form.Add("arr[x][y][z]", "y") + form.Add("arr[x][y][e]", "f") + form.Add("arr[c]p", "l") + form.Add("arr[c]z", "") + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestHandler_FormData_PUT(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8084", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 500) + + form := url.Values{} + + form.Add("key", "value") + form.Add("name[]", "name1") + form.Add("name[]", "name2") + form.Add("name[]", "name3") + form.Add("arr[x][y][z]", "y") + form.Add("arr[x][y][e]", "f") + form.Add("arr[c]p", "l") + form.Add("arr[c]z", "") + + req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestHandler_FormData_PATCH(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8085", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + form := url.Values{} + + form.Add("key", "value") + form.Add("name[]", "name1") + form.Add("name[]", "name2") + form.Add("name[]", "name3") + form.Add("arr[x][y][z]", "y") + form.Add("arr[x][y][e]", "f") + form.Add("arr[c]p", "l") + form.Add("arr[c]z", "") + + req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, strings.NewReader(form.Encode())) + assert.NoError(t, err) + + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestHandler_Multipart_POST(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8019", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + err := w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name1") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name2") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name3") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[x][y][z]", "y") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[x][y][e]", "f") + + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]p", "l") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]z", "") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.Close() + if err != nil { + t.Errorf("error closing the writer: error %v", err) + } + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestHandler_Multipart_PUT(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8020", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 500) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + err := w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name1") + + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name2") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name3") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[x][y][z]", "y") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[x][y][e]", "f") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]p", "l") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]z", "") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.Close() + if err != nil { + t.Errorf("error closing the writer: error %v", err) + } + + req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestHandler_Multipart_PATCH(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php data pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8021", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 500) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + err := w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name1") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name2") + + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name3") + + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[x][y][z]", "y") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[x][y][e]", "f") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]p", "l") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]z", "") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.Close() + if err != nil { + t.Errorf("error closing the writer: error %v", err) + } + + req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) +} + +func TestHandler_Error(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php error pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + _, r, err := get("http://localhost:8177/?hello=world") + assert.NoError(t, err) + assert.Equal(t, 500, r.StatusCode) +} + +func TestHandler_Error2(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php error2 pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + _, r, err := get("http://localhost:8177/?hello=world") + assert.NoError(t, err) + assert.Equal(t, 500, r.StatusCode) +} + +func TestHandler_Error3(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php pid pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + b2 := &bytes.Buffer{} + for i := 0; i < 1024*1024; i++ { + b2.Write([]byte(" ")) + } + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, b2) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() + + assert.NoError(t, err) + assert.Equal(t, 500, r.StatusCode) +} + +func TestHandler_ResponseDuration(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + gotresp := make(chan interface{}) + h.Listen(func(event int, ctx interface{}) { + if event == EventResponse { + c := ctx.(*ResponseEvent) + + if c.Elapsed() > 0 { + close(gotresp) + } + } + }) + + body, r, err := get("http://localhost:8177/?hello=world") + assert.NoError(t, err) + + <-gotresp + + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", body) +} + +func TestHandler_ResponseDurationDelayed(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php echoDelay pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + gotresp := make(chan interface{}) + h.Listen(func(event int, ctx interface{}) { + if event == EventResponse { + c := ctx.(*ResponseEvent) + + if c.Elapsed() > time.Second { + close(gotresp) + } + } + }) + + body, r, err := get("http://localhost:8177/?hello=world") + assert.NoError(t, err) + + <-gotresp + + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", body) +} + +func TestHandler_ErrorDuration(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php error pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + goterr := make(chan interface{}) + h.Listen(func(event int, ctx interface{}) { + if event == EventError { + c := ctx.(*ErrorEvent) + + if c.Elapsed() > 0 { + close(goterr) + } + } + }) + + _, r, err := get("http://localhost:8177/?hello=world") + assert.NoError(t, err) + + <-goterr + + assert.Equal(t, 500, r.StatusCode) +} + +func TestHandler_IP(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + TrustedSubnets: []string{ + "10.0.0.0/8", + "127.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "::1/128", + "fc00::/7", + "fe80::/10", + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php ip pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + err := h.cfg.parseCIDRs() + if err != nil { + t.Errorf("error parsing CIDRs: error %v", err) + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + body, r, err := get("http://127.0.0.1:8177/") + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "127.0.0.1", body) +} + +func TestHandler_XRealIP(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + TrustedSubnets: []string{ + "10.0.0.0/8", + "127.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "::1/128", + "fc00::/7", + "fe80::/10", + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php ip pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + err := h.cfg.parseCIDRs() + if err != nil { + t.Errorf("error parsing CIDRs: error %v", err) + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{ + "X-Real-Ip": "200.0.0.1", + }) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "200.0.0.1", body) +} + +func TestHandler_XForwardedFor(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + TrustedSubnets: []string{ + "10.0.0.0/8", + "127.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + "100.0.0.0/16", + "200.0.0.0/16", + "::1/128", + "fc00::/7", + "fe80::/10", + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php ip pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + err := h.cfg.parseCIDRs() + if err != nil { + t.Errorf("error parsing CIDRs: error %v", err) + } + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{ + "X-Forwarded-For": "100.0.0.1, 200.0.0.1, invalid, 101.0.0.1", + }) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "101.0.0.1", body) + + body, r, err = getHeader("http://127.0.0.1:8177/", map[string]string{ + "X-Forwarded-For": "100.0.0.1, 200.0.0.1, 101.0.0.1, invalid", + }) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "101.0.0.1", body) +} + +func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + TrustedSubnets: []string{ + "10.0.0.0/8", + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php ip pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + err := h.cfg.parseCIDRs() + if err != nil { + t.Errorf("error parsing CIDRs: error %v", err) + } + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{ + "X-Forwarded-For": "100.0.0.1, 200.0.0.1, invalid, 101.0.0.1", + }) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "127.0.0.1", body) +} + +func BenchmarkHandler_Listen_Echo(b *testing.B) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + err := h.rr.Start() + if err != nil { + b.Errorf("error starting the worker pool: error %v", err) + } + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8177", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + b.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + b.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + bb := "WORLD" + for n := 0; n < b.N; n++ { + r, err := http.Get("http://localhost:8177/?hello=world") + if err != nil { + b.Fail() + } + // Response might be nil here + if r != nil { + br, err := ioutil.ReadAll(r.Body) + if err != nil { + b.Errorf("error reading Body: error %v", err) + } + if string(br) != bb { + b.Fail() + } + err = r.Body.Close() + if err != nil { + b.Errorf("error closing the Body: error %v", err) + } + } else { + b.Errorf("got nil response") + } + } +} diff --git a/plugins/http/parse.go b/plugins/http/parse.go new file mode 100644 index 00000000..9b58d328 --- /dev/null +++ b/plugins/http/parse.go @@ -0,0 +1,147 @@ +package http + +import ( + "net/http" +) + +// MaxLevel defines maximum tree depth for incoming request data and files. +const MaxLevel = 127 + +type dataTree map[string]interface{} +type fileTree map[string]interface{} + +// parseData parses incoming request body into data tree. +func parseData(r *http.Request) dataTree { + data := make(dataTree) + if r.PostForm != nil { + for k, v := range r.PostForm { + data.push(k, v) + } + } + + if r.MultipartForm != nil { + for k, v := range r.MultipartForm.Value { + data.push(k, v) + } + } + + return data +} + +// pushes value into data tree. +func (d dataTree) push(k string, v []string) { + keys := fetchIndexes(k) + if len(keys) <= MaxLevel { + d.mount(keys, v) + } +} + +// mount mounts data tree recursively. +func (d dataTree) mount(i []string, v []string) { + if len(i) == 1 { + // single value context (last element) + d[i[0]] = v[len(v)-1] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(dataTree).mount(i[1:], v) + return + } + + d[i[0]] = make(dataTree) + d[i[0]].(dataTree).mount(i[1:], v) +} + +// parse incoming dataTree request into JSON (including contentMultipart form dataTree) +func parseUploads(r *http.Request, cfg *UploadsConfig) *Uploads { + u := &Uploads{ + cfg: cfg, + tree: make(fileTree), + list: make([]*FileUpload, 0), + } + + for k, v := range r.MultipartForm.File { + files := make([]*FileUpload, 0, len(v)) + for _, f := range v { + files = append(files, NewUpload(f)) + } + + u.list = append(u.list, files...) + u.tree.push(k, files) + } + + return u +} + +// pushes new file upload into it's proper place. +func (d fileTree) push(k string, v []*FileUpload) { + keys := fetchIndexes(k) + if len(keys) <= MaxLevel { + d.mount(keys, v) + } +} + +// mount mounts data tree recursively. +func (d fileTree) mount(i []string, v []*FileUpload) { + if len(i) == 1 { + // single value context + d[i[0]] = v[0] + return + } + + if len(i) == 2 && i[1] == "" { + // non associated array of elements + d[i[0]] = v + return + } + + if p, ok := d[i[0]]; ok { + p.(fileTree).mount(i[1:], v) + return + } + + d[i[0]] = make(fileTree) + d[i[0]].(fileTree).mount(i[1:], v) +} + +// fetchIndexes parses input name and splits it into separate indexes list. +func fetchIndexes(s string) []string { + var ( + pos int + ch string + keys = make([]string, 1) + ) + + for _, c := range s { + ch = string(c) + switch ch { + case " ": + // ignore all spaces + continue + case "[": + pos = 1 + continue + case "]": + if pos == 1 { + keys = append(keys, "") + } + pos = 2 + default: + if pos == 1 || pos == 2 { + keys = append(keys, "") + } + + keys[len(keys)-1] += ch + pos = 0 + } + } + + return keys +} diff --git a/plugins/http/parse_test.go b/plugins/http/parse_test.go new file mode 100644 index 00000000..f95a3f9d --- /dev/null +++ b/plugins/http/parse_test.go @@ -0,0 +1,52 @@ +package http + +import "testing" + +var samples = []struct { + in string + out []string +}{ + {"key", []string{"key"}}, + {"key[subkey]", []string{"key", "subkey"}}, + {"key[subkey]value", []string{"key", "subkey", "value"}}, + {"key[subkey][value]", []string{"key", "subkey", "value"}}, + {"key[subkey][value][]", []string{"key", "subkey", "value", ""}}, + {"key[subkey] [value][]", []string{"key", "subkey", "value", ""}}, + {"key [ subkey ] [ value ] [ ]", []string{"key", "subkey", "value", ""}}, +} + +func Test_FetchIndexes(t *testing.T) { + for _, tt := range samples { + t.Run(tt.in, func(t *testing.T) { + r := fetchIndexes(tt.in) + if !same(r, tt.out) { + t.Errorf("got %q, want %q", r, tt.out) + } + }) + } +} + +func BenchmarkConfig_FetchIndexes(b *testing.B) { + for _, tt := range samples { + for n := 0; n < b.N; n++ { + r := fetchIndexes(tt.in) + if !same(r, tt.out) { + b.Fail() + } + } + } +} + +func same(in, out []string) bool { + if len(in) != len(out) { + return false + } + + for i, v := range in { + if v != out[i] { + return false + } + } + + return true +} diff --git a/plugins/http/request.go b/plugins/http/request.go new file mode 100644 index 00000000..8da5440f --- /dev/null +++ b/plugins/http/request.go @@ -0,0 +1,183 @@ +package http + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "strings" + + json "github.com/json-iterator/go" + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service/http/attributes" +) + +const ( + defaultMaxMemory = 32 << 20 // 32 MB + contentNone = iota + 900 + contentStream + contentMultipart + contentFormData +) + +// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. +type Request struct { + // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address. + RemoteAddr string `json:"remoteAddr"` + + // Protocol includes HTTP protocol version. + Protocol string `json:"protocol"` + + // Method contains name of HTTP method used for the request. + Method string `json:"method"` + + // URI contains full request URI with scheme and query. + URI string `json:"uri"` + + // Header contains list of request headers. + Header http.Header `json:"headers"` + + // Cookies contains list of request cookies. + Cookies map[string]string `json:"cookies"` + + // RawQuery contains non parsed query string (to be parsed on php end). + RawQuery string `json:"rawQuery"` + + // Parsed indicates that request body has been parsed on RR end. + Parsed bool `json:"parsed"` + + // Uploads contains list of uploaded files, their names, sized and associations with temporary files. + Uploads *Uploads `json:"uploads"` + + // Attributes can be set by chained mdwr to safely pass value from Golang to PHP. See: GetAttribute, SetAttribute functions. + Attributes map[string]interface{} `json:"attributes"` + + // request body can be parsedData or []byte + body interface{} +} + +func fetchIP(pair string) string { + if !strings.ContainsRune(pair, ':') { + return pair + } + + addr, _, _ := net.SplitHostPort(pair) + return addr +} + +// NewRequest creates new PSR7 compatible request using net/http request. +func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { + req = &Request{ + RemoteAddr: fetchIP(r.RemoteAddr), + Protocol: r.Proto, + Method: r.Method, + URI: uri(r), + Header: r.Header, + Cookies: make(map[string]string), + RawQuery: r.URL.RawQuery, + Attributes: attributes.All(r), + } + + for _, c := range r.Cookies() { + if v, err := url.QueryUnescape(c.Value); err == nil { + req.Cookies[c.Name] = v + } + } + + switch req.contentType() { + case contentNone: + return req, nil + + case contentStream: + req.body, err = ioutil.ReadAll(r.Body) + return req, err + + case contentMultipart: + if err = r.ParseMultipartForm(defaultMaxMemory); err != nil { + return nil, err + } + + req.Uploads = parseUploads(r, cfg) + fallthrough + case contentFormData: + if err = r.ParseForm(); err != nil { + return nil, err + } + + req.body = parseData(r) + } + + req.Parsed = true + return req, nil +} + +// Open moves all uploaded files to temporary directory so it can be given to php later. +func (r *Request) Open(log *logrus.Logger) { + if r.Uploads == nil { + return + } + + r.Uploads.Open(log) +} + +// Close clears all temp file uploads +func (r *Request) Close(log *logrus.Logger) { + if r.Uploads == nil { + return + } + + r.Uploads.Clear(log) +} + +// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open +// files prior to calling this method. +func (r *Request) Payload() (p *roadrunner.Payload, err error) { + p = &roadrunner.Payload{} + + j := json.ConfigCompatibleWithStandardLibrary + if p.Context, err = j.Marshal(r); err != nil { + return nil, err + } + + if r.Parsed { + if p.Body, err = j.Marshal(r.body); err != nil { + return nil, err + } + } else if r.body != nil { + p.Body = r.body.([]byte) + } + + return p, nil +} + +// contentType returns the payload content type. +func (r *Request) contentType() int { + if r.Method == "HEAD" || r.Method == "OPTIONS" { + return contentNone + } + + ct := r.Header.Get("content-type") + if strings.Contains(ct, "application/x-www-form-urlencoded") { + return contentFormData + } + + if strings.Contains(ct, "multipart/form-data") { + return contentMultipart + } + + return contentStream +} + +// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). +func uri(r *http.Request) string { + if r.URL.Host != "" { + return r.URL.String() + } + if r.TLS != nil { + return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) + } + + return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) +} diff --git a/plugins/http/response.go b/plugins/http/response.go new file mode 100644 index 00000000..f34754be --- /dev/null +++ b/plugins/http/response.go @@ -0,0 +1,107 @@ +package http + +import ( + "io" + "net/http" + "strings" + + json "github.com/json-iterator/go" + + "github.com/spiral/roadrunner" +) + + +// Response handles PSR7 response logic. +type Response struct { + // Status contains response status. + Status int `json:"status"` + + // Header contains list of response headers. + Headers map[string][]string `json:"headers"` + + // associated body payload. + body interface{} +} + +// NewResponse creates new response based on given rr payload. +func NewResponse(p *roadrunner.Payload) (*Response, error) { + r := &Response{body: p.Body} + j := json.ConfigCompatibleWithStandardLibrary + if err := j.Unmarshal(p.Context, r); err != nil { + return nil, err + } + + return r, nil +} + +// Write writes response headers, status and body into ResponseWriter. +func (r *Response) Write(w http.ResponseWriter) error { + // INFO map is the reference type in golang + p := handlePushHeaders(r.Headers) + if pusher, ok := w.(http.Pusher); ok { + for _, v := range p { + err := pusher.Push(v, nil) + if err != nil { + return err + } + } + } + + handleTrailers(r.Headers) + for n, h := range r.Headers { + for _, v := range h { + w.Header().Add(n, v) + } + } + + w.WriteHeader(r.Status) + + if data, ok := r.body.([]byte); ok { + _, err := w.Write(data) + if err != nil { + return handleWriteError(err) + } + } + + if rc, ok := r.body.(io.Reader); ok { + if _, err := io.Copy(w, rc); err != nil { + return err + } + } + + return nil +} + +func handlePushHeaders(h map[string][]string) []string { + var p []string + pushHeader, ok := h[http2pushHeaderKey] + if !ok { + return p + } + + p = append(p, pushHeader...) + + delete(h, http2pushHeaderKey) + + return p +} + +func handleTrailers(h map[string][]string) { + trailers, ok := h[trailerHeaderKey] + if !ok { + return + } + + for _, tr := range trailers { + for _, n := range strings.Split(tr, ",") { + n = strings.Trim(n, "\t ") + if v, ok := h[n]; ok { + h["Trailer:"+n] = v + + delete(h, n) + } + } + } + + delete(h, trailerHeaderKey) +} diff --git a/plugins/http/response_test.go b/plugins/http/response_test.go new file mode 100644 index 00000000..1f394276 --- /dev/null +++ b/plugins/http/response_test.go @@ -0,0 +1,162 @@ +package http + +import ( + "bytes" + "errors" + "net/http" + "testing" + + "github.com/spiral/roadrunner" + "github.com/stretchr/testify/assert" +) + +type testWriter struct { + h http.Header + buf bytes.Buffer + wroteHeader bool + code int + err error + pushErr error + pushes []string +} + +func (tw *testWriter) Header() http.Header { return tw.h } + +func (tw *testWriter) Write(p []byte) (int, error) { + if !tw.wroteHeader { + tw.WriteHeader(http.StatusOK) + } + + n, e := tw.buf.Write(p) + if e == nil { + e = tw.err + } + + return n, e +} + +func (tw *testWriter) WriteHeader(code int) { tw.wroteHeader = true; tw.code = code } + +func (tw *testWriter) Push(target string, opts *http.PushOptions) error { + tw.pushes = append(tw.pushes, target) + + return tw.pushErr +} + +func TestNewResponse_Error(t *testing.T) { + r, err := NewResponse(&roadrunner.Payload{Context: []byte(`invalid payload`)}) + assert.Error(t, err) + assert.Nil(t, r) +} + +func TestNewResponse_Write(t *testing.T) { + r, err := NewResponse(&roadrunner.Payload{ + Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), + Body: []byte(`sample body`), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Equal(t, 301, w.code) + assert.Equal(t, "value", w.h.Get("key")) + assert.Equal(t, "sample body", w.buf.String()) +} + +func TestNewResponse_Stream(t *testing.T) { + r, err := NewResponse(&roadrunner.Payload{ + Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), + }) + + // r is pointer, so, it might be nil + if r == nil { + t.Fatal("response is nil") + } + + r.body = &bytes.Buffer{} + r.body.(*bytes.Buffer).WriteString("hello world") + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Equal(t, 301, w.code) + assert.Equal(t, "value", w.h.Get("key")) + assert.Equal(t, "hello world", w.buf.String()) +} + +func TestNewResponse_StreamError(t *testing.T) { + r, err := NewResponse(&roadrunner.Payload{ + Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), + }) + + // r is pointer, so, it might be nil + if r == nil { + t.Fatal("response is nil") + } + + r.body = &bytes.Buffer{} + r.body.(*bytes.Buffer).WriteString("hello world") + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string)), err: errors.New("error")} + assert.Error(t, r.Write(w)) +} + +func TestWrite_HandlesPush(t *testing.T) { + r, err := NewResponse(&roadrunner.Payload{ + Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Nil(t, w.h["Http2-Push"]) + assert.Equal(t, []string{"/test.js"}, w.pushes) +} + +func TestWrite_HandlesTrailers(t *testing.T) { + r, err := NewResponse(&roadrunner.Payload{ + Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Nil(t, w.h[trailerHeaderKey]) + assert.Nil(t, w.h["foo"]) //nolint:golint,staticcheck + assert.Nil(t, w.h["baz"]) //nolint:golint,staticcheck + + assert.Equal(t, "test", w.h.Get("Trailer:foo")) + assert.Equal(t, "demo", w.h.Get("Trailer:bar")) +} + +func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) { + r, err := NewResponse(&roadrunner.Payload{ + Context: []byte( + `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + + w := &testWriter{h: http.Header(make(map[string][]string))} + assert.NoError(t, r.Write(w)) + + assert.Equal(t, "a", w.h.Get("Trailer:foo")) + assert.Equal(t, "b", w.h.Get("Trailer:bar")) + assert.Equal(t, "c", w.h.Get("Trailer:baz")) +} diff --git a/plugins/http/rpc.go b/plugins/http/rpc.go new file mode 100644 index 00000000..7b38dece --- /dev/null +++ b/plugins/http/rpc.go @@ -0,0 +1,34 @@ +package http + +import ( + "github.com/pkg/errors" + "github.com/spiral/roadrunner/util" +) + +type rpcServer struct{ svc *Service } + +// WorkerList contains list of workers. +type WorkerList struct { + // Workers is list of workers. + Workers []*util.State `json:"workers"` +} + +// Reset resets underlying RR worker pool and restarts all of it's workers. +func (rpc *rpcServer) Reset(reset bool, r *string) error { + if rpc.svc == nil || rpc.svc.handler == nil { + return errors.New("http server is not running") + } + + *r = "OK" + return rpc.svc.Server().Reset() +} + +// Workers returns list of active workers and their stats. +func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) { + if rpc.svc == nil || rpc.svc.handler == nil { + return errors.New("http server is not running") + } + + r.Workers, err = util.ServerState(rpc.svc.Server()) + return err +} diff --git a/plugins/http/rpc_test.go b/plugins/http/rpc_test.go new file mode 100644 index 00000000..e57a8699 --- /dev/null +++ b/plugins/http/rpc_test.go @@ -0,0 +1,221 @@ +package http + +import ( + json "github.com/json-iterator/go" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/rpc" + "github.com/stretchr/testify/assert" + "os" + "strconv" + "testing" + "time" +) + +func Test_RPC(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rpc.ID, &rpc.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`, + httpCfg: `{ + "enable": true, + "address": ":16031", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php pid pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, _ := c.Get(ID) + ss := s.(*Service) + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + + time.Sleep(time.Second) + + res, _, err := get("http://localhost:16031") + if err != nil { + t.Fatal(err) + } + assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res) + + cl, err := rs.Client() + assert.NoError(t, err) + + r := "" + assert.NoError(t, cl.Call("http.Reset", true, &r)) + assert.Equal(t, "OK", r) + + res2, _, err := get("http://localhost:16031") + if err != nil { + t.Fatal(err) + } + assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2) + assert.NotEqual(t, res, res2) + c.Stop() +} + +func Test_RPC_Unix(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rpc.ID, &rpc.Service{}) + c.Register(ID, &Service{}) + + sock := `unix://` + os.TempDir() + `/rpc.unix` + j := json.ConfigCompatibleWithStandardLibrary + data, _ := j.Marshal(sock) + + assert.NoError(t, c.Init(&testCfg{ + rpcCfg: `{"enable":true, "listen":` + string(data) + `}`, + httpCfg: `{ + "enable": true, + "address": ":6032", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php pid pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, _ := c.Get(ID) + ss := s.(*Service) + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + + time.Sleep(time.Millisecond * 500) + + res, _, err := get("http://localhost:6032") + if err != nil { + c.Stop() + t.Fatal(err) + } + if ss.rr.Workers() != nil && len(ss.rr.Workers()) > 0 { + assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res) + } else { + c.Stop() + t.Fatal("no workers initialized") + } + + cl, err := rs.Client() + if err != nil { + c.Stop() + t.Fatal(err) + } + + r := "" + assert.NoError(t, cl.Call("http.Reset", true, &r)) + assert.Equal(t, "OK", r) + + res2, _, err := get("http://localhost:6032") + if err != nil { + c.Stop() + t.Fatal(err) + } + assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2) + assert.NotEqual(t, res, res2) + c.Stop() +} + +func Test_Workers(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rpc.ID, &rpc.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`, + httpCfg: `{ + "enable": true, + "address": ":6033", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php pid pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, _ := c.Get(ID) + ss := s.(*Service) + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + time.Sleep(time.Millisecond * 500) + + cl, err := rs.Client() + assert.NoError(t, err) + + r := &WorkerList{} + assert.NoError(t, cl.Call("http.Workers", true, &r)) + assert.Len(t, r.Workers, 1) + + assert.Equal(t, *ss.rr.Workers()[0].Pid, r.Workers[0].Pid) + c.Stop() +} + +func Test_Errors(t *testing.T) { + r := &rpcServer{nil} + + assert.Error(t, r.Reset(true, nil)) + assert.Error(t, r.Workers(true, nil)) +} diff --git a/plugins/http/service.go b/plugins/http/service.go new file mode 100644 index 00000000..25a10064 --- /dev/null +++ b/plugins/http/service.go @@ -0,0 +1,427 @@ +package http + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/http/fcgi" + "net/url" + "strings" + "sync" + + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service/env" + "github.com/spiral/roadrunner/service/http/attributes" + "github.com/spiral/roadrunner/service/rpc" + "github.com/spiral/roadrunner/util" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" + "golang.org/x/sys/cpu" +) + +const ( + // ID contains default service name. + ID = "http" + + // EventInitSSL thrown at moment of https initialization. SSL server passed as context. + EventInitSSL = 750 +) + +var couldNotAppendPemError = errors.New("could not append Certs from PEM") + +// http middleware type. +type middleware func(f http.HandlerFunc) http.HandlerFunc + +// Service manages rr, http servers. +type Service struct { + sync.Mutex + sync.WaitGroup + + cfg *Config + log *logrus.Logger + cprod roadrunner.CommandProducer + env env.Environment + lsns []func(event int, ctx interface{}) + mdwr []middleware + + rr *roadrunner.Server + controller roadrunner.Controller + handler *Handler + + http *http.Server + https *http.Server + fcgi *http.Server +} + +// Attach attaches controller. Currently only one controller is supported. +func (s *Service) Attach(w roadrunner.Controller) { + s.controller = w +} + +// ProduceCommands changes the default command generator method +func (s *Service) ProduceCommands(producer roadrunner.CommandProducer) { + s.cprod = producer +} + +// AddMiddleware adds new net/http mdwr. +func (s *Service) AddMiddleware(m middleware) { + s.mdwr = append(s.mdwr, m) +} + +// AddListener attaches server event controller. +func (s *Service) AddListener(l func(event int, ctx interface{})) { + s.lsns = append(s.lsns, l) +} + +// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of +// misconfiguration. Services must not be used without proper configuration pushed first. +func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment, log *logrus.Logger) (bool, error) { + s.cfg = cfg + s.log = log + s.env = e + + if r != nil { + if err := r.Register(ID, &rpcServer{s}); err != nil { + return false, err + } + } + + if !cfg.EnableHTTP() && !cfg.EnableTLS() && !cfg.EnableFCGI() { + return false, nil + } + + return true, nil +} + +// Serve serves the svc. +func (s *Service) Serve() error { + s.Lock() + + if s.env != nil { + if err := s.env.Copy(s.cfg.Workers); err != nil { + return nil + } + } + + s.cfg.Workers.CommandProducer = s.cprod + s.cfg.Workers.SetEnv("RR_HTTP", "true") + + s.rr = roadrunner.NewServer(s.cfg.Workers) + s.rr.Listen(s.throw) + + if s.controller != nil { + s.rr.Attach(s.controller) + } + + s.handler = &Handler{cfg: s.cfg, rr: s.rr} + s.handler.Listen(s.throw) + + if s.cfg.EnableHTTP() { + if s.cfg.EnableH2C() { + s.http = &http.Server{Addr: s.cfg.Address, Handler: h2c.NewHandler(s, &http2.Server{})} + } else { + s.http = &http.Server{Addr: s.cfg.Address, Handler: s} + } + } + + if s.cfg.EnableTLS() { + s.https = s.initSSL() + if s.cfg.SSL.RootCA != "" { + err := s.appendRootCa() + if err != nil { + return err + } + } + + if s.cfg.EnableHTTP2() { + if err := s.initHTTP2(); err != nil { + return err + } + } + } + + if s.cfg.EnableFCGI() { + s.fcgi = &http.Server{Handler: s} + } + + s.Unlock() + + if err := s.rr.Start(); err != nil { + return err + } + defer s.rr.Stop() + + err := make(chan error, 3) + + if s.http != nil { + go func() { + httpErr := s.http.ListenAndServe() + if httpErr != nil && httpErr != http.ErrServerClosed { + err <- httpErr + } else { + err <- nil + } + }() + } + + if s.https != nil { + go func() { + httpErr := s.https.ListenAndServeTLS( + s.cfg.SSL.Cert, + s.cfg.SSL.Key, + ) + + if httpErr != nil && httpErr != http.ErrServerClosed { + err <- httpErr + return + } + err <- nil + }() + } + + if s.fcgi != nil { + go func() { + httpErr := s.serveFCGI() + if httpErr != nil && httpErr != http.ErrServerClosed { + err <- httpErr + return + } + err <- nil + }() + } + return <-err +} + +// Stop stops the http. +func (s *Service) Stop() { + s.Lock() + defer s.Unlock() + + if s.fcgi != nil { + s.Add(1) + go func() { + defer s.Done() + err := s.fcgi.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + // Stop() error + // push error from goroutines to the channel and block unil error or success shutdown or timeout + s.log.Error(fmt.Errorf("error shutting down the fcgi server, error: %v", err)) + return + } + }() + } + + if s.https != nil { + s.Add(1) + go func() { + defer s.Done() + err := s.https.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error(fmt.Errorf("error shutting down the https server, error: %v", err)) + return + } + }() + } + + if s.http != nil { + s.Add(1) + go func() { + defer s.Done() + err := s.http.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error(fmt.Errorf("error shutting down the http server, error: %v", err)) + return + } + }() + } + + s.Wait() +} + +// Server returns associated rr server (if any). +func (s *Service) Server() *roadrunner.Server { + s.Lock() + defer s.Unlock() + + return s.rr +} + +// ServeHTTP handles connection using set of middleware and rr PSR-7 server. +func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect { + target := &url.URL{ + Scheme: "https", + Host: s.tlsAddr(r.Host, false), + Path: r.URL.Path, + RawQuery: r.URL.RawQuery, + } + + http.Redirect(w, r, target.String(), http.StatusTemporaryRedirect) + return + } + + if s.https != nil && r.TLS != nil { + w.Header().Add("Strict-Transport-Security", "max-age=31536000; includeSubDomains; preload") + } + + r = attributes.Init(r) + + // chaining middleware + f := s.handler.ServeHTTP + for _, m := range s.mdwr { + f = m(f) + } + f(w, r) +} + +// append RootCA to the https server TLS config +func (s *Service) appendRootCa() error { + rootCAs, err := x509.SystemCertPool() + if err != nil { + s.throw(EventInitSSL, nil) + return nil + } + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + + CA, err := ioutil.ReadFile(s.cfg.SSL.RootCA) + if err != nil { + s.throw(EventInitSSL, nil) + return err + } + + // should append our CA cert + ok := rootCAs.AppendCertsFromPEM(CA) + if !ok { + return couldNotAppendPemError + } + config := &tls.Config{ + InsecureSkipVerify: false, + RootCAs: rootCAs, + } + s.http.TLSConfig = config + + return nil +} + +// Init https server +func (s *Service) initSSL() *http.Server { + var topCipherSuites []uint16 + var defaultCipherSuitesTLS13 []uint16 + + hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ + hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL + // Keep in sync with crypto/aes/cipher_s390x.go. + hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM) + + hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X + + if hasGCMAsm { + // If AES-GCM hardware is provided then prioritise AES-GCM + // cipher suites. + topCipherSuites = []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + } + defaultCipherSuitesTLS13 = []uint16{ + tls.TLS_AES_128_GCM_SHA256, + tls.TLS_CHACHA20_POLY1305_SHA256, + tls.TLS_AES_256_GCM_SHA384, + } + } else { + // Without AES-GCM hardware, we put the ChaCha20-Poly1305 + // cipher suites first. + topCipherSuites = []uint16{ + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + } + defaultCipherSuitesTLS13 = []uint16{ + tls.TLS_CHACHA20_POLY1305_SHA256, + tls.TLS_AES_128_GCM_SHA256, + tls.TLS_AES_256_GCM_SHA384, + } + } + + DefaultCipherSuites := make([]uint16, 0, 22) + DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...) + DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) + + server := &http.Server{ + Addr: s.tlsAddr(s.cfg.Address, true), + Handler: s, + TLSConfig: &tls.Config{ + CurvePreferences: []tls.CurveID{ + tls.CurveP256, + tls.CurveP384, + tls.CurveP521, + tls.X25519, + }, + CipherSuites: DefaultCipherSuites, + MinVersion: tls.VersionTLS12, + PreferServerCipherSuites: true, + }, + } + s.throw(EventInitSSL, server) + + return server +} + +// init http/2 server +func (s *Service) initHTTP2() error { + return http2.ConfigureServer(s.https, &http2.Server{ + MaxConcurrentStreams: s.cfg.HTTP2.MaxConcurrentStreams, + }) +} + +// serveFCGI starts FastCGI server. +func (s *Service) serveFCGI() error { + l, err := util.CreateListener(s.cfg.FCGI.Address) + if err != nil { + return err + } + + err = fcgi.Serve(l, s.fcgi.Handler) + if err != nil { + return err + } + + return nil +} + +// throw handles service, server and pool events. +func (s *Service) throw(event int, ctx interface{}) { + for _, l := range s.lsns { + l(event, ctx) + } + + if event == roadrunner.EventServerFailure { + // underlying rr server is dead + s.Stop() + } +} + +// tlsAddr replaces listen or host port with port configured by SSL config. +func (s *Service) tlsAddr(host string, forcePort bool) string { + // remove current forcePort first + host = strings.Split(host, ":")[0] + + if forcePort || s.cfg.SSL.Port != 443 { + host = fmt.Sprintf("%s:%v", host, s.cfg.SSL.Port) + } + + return host +} diff --git a/plugins/http/service_test.go b/plugins/http/service_test.go new file mode 100644 index 00000000..f7ee33cc --- /dev/null +++ b/plugins/http/service_test.go @@ -0,0 +1,759 @@ +package http + +import ( + "github.com/cenkalti/backoff/v4" + json "github.com/json-iterator/go" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/env" + "github.com/spiral/roadrunner/service/rpc" + "github.com/stretchr/testify/assert" + "io/ioutil" + "net/http" + "os" + "testing" + "time" +) + +type testCfg struct { + httpCfg string + rpcCfg string + envCfg string + target string +} + +func (cfg *testCfg) Get(name string) service.Config { + if name == ID { + if cfg.httpCfg == "" { + return nil + } + + return &testCfg{target: cfg.httpCfg} + } + + if name == rpc.ID { + return &testCfg{target: cfg.rpcCfg} + } + + if name == env.ID { + return &testCfg{target: cfg.envCfg} + } + + return nil +} +func (cfg *testCfg) Unmarshal(out interface{}) error { + j := json.ConfigCompatibleWithStandardLibrary + return j.Unmarshal([]byte(cfg.target), out) +} + +func Test_Service_NoConfig(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{"Enable":true}`}) + assert.Error(t, err) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusInactive, st) +} + +func Test_Service_Configure_Disable(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusInactive, st) +} + +func Test_Service_Configure_Enable(t *testing.T) { + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":8070", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`}) + if err != nil { + return err + } + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } + +} + +func Test_Service_Echo(t *testing.T) { + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6536", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`}) + if err != nil { + return err + } + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + // should do nothing + s.(*Service).Stop() + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() + + time.Sleep(time.Millisecond * 100) + + req, err := http.NewRequest("GET", "http://localhost:6536?hello=world", nil) + if err != nil { + c.Stop() + return err + } + + r, err := http.DefaultClient.Do(req) + if err != nil { + c.Stop() + return err + } + b, err := ioutil.ReadAll(r.Body) + if err != nil { + c.Stop() + return err + } + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) + + err = r.Body.Close() + if err != nil { + c.Stop() + return err + } + + c.Stop() + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } +} + +func Test_Service_Env(t *testing.T) { + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(env.ID, env.NewService(map[string]string{"rr": "test"})) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":10031", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php env pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`, envCfg: `{"env_key":"ENV_VALUE"}`}) + if err != nil { + return err + } + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + // should do nothing + s.(*Service).Stop() + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() + + time.Sleep(time.Millisecond * 500) + + req, err := http.NewRequest("GET", "http://localhost:10031", nil) + if err != nil { + c.Stop() + return err + } + + r, err := http.DefaultClient.Do(req) + if err != nil { + c.Stop() + return err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + c.Stop() + return err + } + + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "ENV_VALUE", string(b)) + + err = r.Body.Close() + if err != nil { + c.Stop() + return err + } + + c.Stop() + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } + +} + +func Test_Service_ErrorEcho(t *testing.T) { + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6030", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php echoerr pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`}) + if err != nil { + return err + } + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + goterr := make(chan interface{}) + s.(*Service).AddListener(func(event int, ctx interface{}) { + if event == roadrunner.EventStderrOutput { + if string(ctx.([]byte)) == "WORLD\n" { + goterr <- nil + } + } + }) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() + + time.Sleep(time.Millisecond * 500) + + req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil) + if err != nil { + c.Stop() + return err + } + + r, err := http.DefaultClient.Do(req) + if err != nil { + c.Stop() + return err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + c.Stop() + return err + } + + <-goterr + + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) + err = r.Body.Close() + if err != nil { + c.Stop() + return err + } + + c.Stop() + + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } +} + +func Test_Service_Middleware(t *testing.T) { + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6032", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`}) + if err != nil { + return err + } + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/halt" { + w.WriteHeader(500) + _, err := w.Write([]byte("halted")) + if err != nil { + t.Errorf("error writing the data to the http reply: error %v", err) + } + } else { + f(w, r) + } + } + }) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() + time.Sleep(time.Millisecond * 500) + + req, err := http.NewRequest("GET", "http://localhost:6032?hello=world", nil) + if err != nil { + c.Stop() + return err + } + + r, err := http.DefaultClient.Do(req) + if err != nil { + c.Stop() + return err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + c.Stop() + return err + } + + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) + + err = r.Body.Close() + if err != nil { + c.Stop() + return err + } + + req, err = http.NewRequest("GET", "http://localhost:6032/halt", nil) + if err != nil { + c.Stop() + return err + } + + r, err = http.DefaultClient.Do(req) + if err != nil { + c.Stop() + return err + } + b, err = ioutil.ReadAll(r.Body) + if err != nil { + c.Stop() + return err + } + + assert.Equal(t, 500, r.StatusCode) + assert.Equal(t, "halted", string(b)) + + err = r.Body.Close() + if err != nil { + c.Stop() + return err + } + c.Stop() + + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } + +} + +func Test_Service_Listener(t *testing.T) { + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6033", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`}) + if err != nil { + return err + } + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + stop := make(chan interface{}) + s.(*Service).AddListener(func(event int, ctx interface{}) { + if event == roadrunner.EventServerStart { + stop <- nil + } + }) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() + time.Sleep(time.Millisecond * 500) + + c.Stop() + assert.True(t, true) + + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } +} + +func Test_Service_Error(t *testing.T) { + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6034", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "relay": "---", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`}) + if err != nil { + return err + } + + // assert error + err = c.Serve() + if err == nil { + return err + } + + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } +} + +func Test_Service_Error2(t *testing.T) { + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6035", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php broken pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`}) + if err != nil { + return err + } + + // assert error + err = c.Serve() + if err == nil { + return err + } + + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } +} + +func Test_Service_Error3(t *testing.T) { + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6036", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers" + "command": "php ../../tests/http/client.php broken pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`}) + // assert error + if err == nil { + return err + } + + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } + +} + +func Test_Service_Error4(t *testing.T) { + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": "----", + "maxRequestSize": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../tests/http/client.php broken pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`}) + // assert error + if err != nil { + return nil + } + + return err + }, bkoff) + + if err != nil { + t.Fatal(err) + } +} + +func tmpDir() string { + p := os.TempDir() + j := json.ConfigCompatibleWithStandardLibrary + r, _ := j.Marshal(p) + + return string(r) +} diff --git a/plugins/http/ssl_test.go b/plugins/http/ssl_test.go new file mode 100644 index 00000000..cf147be9 --- /dev/null +++ b/plugins/http/ssl_test.go @@ -0,0 +1,254 @@ +package http + +import ( + "crypto/tls" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "io/ioutil" + "net/http" + "testing" + "time" +) + +var sslClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, +} + +func Test_SSL_Service_Echo(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "address": ":6029", + "ssl": { + "port": 6900, + "key": "fixtures/server.key", + "cert": "fixtures/server.crt" + }, + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + // should do nothing + s.(*Service).Stop() + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + time.Sleep(time.Millisecond * 500) + + req, err := http.NewRequest("GET", "https://localhost:6900?hello=world", nil) + assert.NoError(t, err) + + r, err := sslClient.Do(req) + assert.NoError(t, err) + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) + + err2 := r.Body.Close() + if err2 != nil { + t.Errorf("fail to close the Body: error %v", err2) + } + + c.Stop() +} + +func Test_SSL_Service_NoRedirect(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "address": ":6030", + "ssl": { + "port": 6901, + "key": "fixtures/server.key", + "cert": "fixtures/server.crt" + }, + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + // should do nothing + s.(*Service).Stop() + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + + time.Sleep(time.Millisecond * 500) + + req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil) + assert.NoError(t, err) + + r, err := sslClient.Do(req) + assert.NoError(t, err) + + assert.Nil(t, r.TLS) + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) + + err2 := r.Body.Close() + if err2 != nil { + t.Errorf("fail to close the Body: error %v", err2) + } + c.Stop() +} + +func Test_SSL_Service_Redirect(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "address": ":6831", + "ssl": { + "port": 6902, + "redirect": true, + "key": "fixtures/server.key", + "cert": "fixtures/server.crt" + }, + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + // should do nothing + s.(*Service).Stop() + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + + time.Sleep(time.Millisecond * 500) + + req, err := http.NewRequest("GET", "http://localhost:6831?hello=world", nil) + assert.NoError(t, err) + + r, err := sslClient.Do(req) + assert.NoError(t, err) + assert.NotNil(t, r.TLS) + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) + + err2 := r.Body.Close() + if err2 != nil { + t.Errorf("fail to close the Body: error %v", err2) + } + c.Stop() +} + +func Test_SSL_Service_Push(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "address": ":6032", + "ssl": { + "port": 6903, + "redirect": true, + "key": "fixtures/server.key", + "cert": "fixtures/server.crt" + }, + "workers":{ + "command": "php ../../tests/http/client.php push pipes", + "pool": {"numWorkers": 1} + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + // should do nothing + s.(*Service).Stop() + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + time.Sleep(time.Millisecond * 500) + + req, err := http.NewRequest("GET", "https://localhost:6903?hello=world", nil) + assert.NoError(t, err) + + r, err := sslClient.Do(req) + assert.NoError(t, err) + + assert.NotNil(t, r.TLS) + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.Equal(t, "", r.Header.Get("Http2-Push")) + + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) + + + err2 := r.Body.Close() + if err2 != nil { + t.Errorf("fail to close the Body: error %v", err2) + } + c.Stop() +} diff --git a/plugins/http/uploads.go b/plugins/http/uploads.go new file mode 100644 index 00000000..39a9eaf2 --- /dev/null +++ b/plugins/http/uploads.go @@ -0,0 +1,160 @@ +package http + +import ( + "fmt" + json "github.com/json-iterator/go" + "github.com/sirupsen/logrus" + "io" + "io/ioutil" + "mime/multipart" + "os" + "sync" +) + +const ( + // UploadErrorOK - no error, the file uploaded with success. + UploadErrorOK = 0 + + // UploadErrorNoFile - no file was uploaded. + UploadErrorNoFile = 4 + + // UploadErrorNoTmpDir - missing a temporary folder. + UploadErrorNoTmpDir = 5 + + // UploadErrorCantWrite - failed to write file to disk. + UploadErrorCantWrite = 6 + + // UploadErrorExtension - forbidden file extension. + UploadErrorExtension = 7 +) + +// Uploads tree manages uploaded files tree and temporary files. +type Uploads struct { + // associated temp directory and forbidden extensions. + cfg *UploadsConfig + + // pre processed data tree for Uploads. + tree fileTree + + // flat list of all file Uploads. + list []*FileUpload +} + +// MarshalJSON marshal tree tree into JSON. +func (u *Uploads) MarshalJSON() ([]byte, error) { + j := json.ConfigCompatibleWithStandardLibrary + return j.Marshal(u.tree) +} + +// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors +// will be handled individually. +func (u *Uploads) Open(log *logrus.Logger) { + var wg sync.WaitGroup + for _, f := range u.list { + wg.Add(1) + go func(f *FileUpload) { + defer wg.Done() + err := f.Open(u.cfg) + if err != nil && log != nil { + log.Error(fmt.Errorf("error opening the file: error %v", err)) + } + }(f) + } + + wg.Wait() +} + +// Clear deletes all temporary files. +func (u *Uploads) Clear(log *logrus.Logger) { + for _, f := range u.list { + if f.TempFilename != "" && exists(f.TempFilename) { + err := os.Remove(f.TempFilename) + if err != nil && log != nil { + log.Error(fmt.Errorf("error removing the file: error %v", err)) + } + } + } +} + +// FileUpload represents singular file NewUpload. +type FileUpload struct { + // ID contains filename specified by the client. + Name string `json:"name"` + + // Mime contains mime-type provided by the client. + Mime string `json:"mime"` + + // Size of the uploaded file. + Size int64 `json:"size"` + + // Error indicates file upload error (if any). See http://php.net/manual/en/features.file-upload.errors.php + Error int `json:"error"` + + // TempFilename points to temporary file location. + TempFilename string `json:"tmpName"` + + // associated file header + header *multipart.FileHeader +} + +// NewUpload wraps net/http upload into PRS-7 compatible structure. +func NewUpload(f *multipart.FileHeader) *FileUpload { + return &FileUpload{ + Name: f.Filename, + Mime: f.Header.Get("Content-Type"), + Error: UploadErrorOK, + header: f, + } +} + +// Open moves file content into temporary file available for PHP. +// NOTE: +// There is 2 deferred functions, and in case of getting 2 errors from both functions +// error from close of temp file would be overwritten by error from the main file +// STACK +// DEFER FILE CLOSE (2) +// DEFER TMP CLOSE (1) +func (f *FileUpload) Open(cfg *UploadsConfig) (err error) { + if cfg.Forbids(f.Name) { + f.Error = UploadErrorExtension + return nil + } + + file, err := f.header.Open() + if err != nil { + f.Error = UploadErrorNoFile + return err + } + + defer func() { + // close the main file + err = file.Close() + }() + + tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload") + if err != nil { + // most likely cause of this issue is missing tmp dir + f.Error = UploadErrorNoTmpDir + return err + } + + f.TempFilename = tmp.Name() + defer func() { + // close the temp file + err = tmp.Close() + }() + + if f.Size, err = io.Copy(tmp, file); err != nil { + f.Error = UploadErrorCantWrite + } + + return err +} + +// exists if file exists. +func exists(path string) bool { + if _, err := os.Stat(path); os.IsNotExist(err) { + return false + } + return true +} diff --git a/plugins/http/uploads_config.go b/plugins/http/uploads_config.go new file mode 100644 index 00000000..3f655064 --- /dev/null +++ b/plugins/http/uploads_config.go @@ -0,0 +1,45 @@ +package http + +import ( + "os" + "path" + "strings" +) + +// UploadsConfig describes file location and controls access to them. +type UploadsConfig struct { + // Dir contains name of directory to control access to. + Dir string + + // Forbid specifies list of file extensions which are forbidden for access. + // Example: .php, .exe, .bat, .htaccess and etc. + Forbid []string +} + +// InitDefaults sets missing values to their default values. +func (cfg *UploadsConfig) InitDefaults() error { + cfg.Forbid = []string{".php", ".exe", ".bat"} + return nil +} + +// TmpDir returns temporary directory. +func (cfg *UploadsConfig) TmpDir() string { + if cfg.Dir != "" { + return cfg.Dir + } + + return os.TempDir() +} + +// Forbids must return true if file extension is not allowed for the upload. +func (cfg *UploadsConfig) Forbids(filename string) bool { + ext := strings.ToLower(path.Ext(filename)) + + for _, v := range cfg.Forbid { + if ext == v { + return true + } + } + + return false +} diff --git a/plugins/http/uploads_config_test.go b/plugins/http/uploads_config_test.go new file mode 100644 index 00000000..2b6ceebc --- /dev/null +++ b/plugins/http/uploads_config_test.go @@ -0,0 +1,24 @@ +package http + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestFsConfig_Forbids(t *testing.T) { + cfg := UploadsConfig{Forbid: []string{".php"}} + + assert.True(t, cfg.Forbids("index.php")) + assert.True(t, cfg.Forbids("index.PHP")) + assert.True(t, cfg.Forbids("phpadmin/index.bak.php")) + assert.False(t, cfg.Forbids("index.html")) +} + +func TestFsConfig_TmpFallback(t *testing.T) { + cfg := UploadsConfig{Dir: "test"} + assert.Equal(t, "test", cfg.TmpDir()) + + cfg = UploadsConfig{Dir: ""} + assert.Equal(t, os.TempDir(), cfg.TmpDir()) +} diff --git a/plugins/http/uploads_test.go b/plugins/http/uploads_test.go new file mode 100644 index 00000000..08177c72 --- /dev/null +++ b/plugins/http/uploads_test.go @@ -0,0 +1,434 @@ +package http + +import ( + "bytes" + "context" + "crypto/md5" + "encoding/hex" + "fmt" + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner" + "github.com/stretchr/testify/assert" + "io" + "io/ioutil" + "mime/multipart" + "net/http" + "os" + "testing" + "time" +) + +func TestHandler_Upload_File(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php upload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8021", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + + f := mustOpen("uploads_test.go") + defer func() { + err := f.Close() + if err != nil { + t.Errorf("failed to close a file: error %v", err) + } + }() + fw, err := w.CreateFormFile("upload", f.Name()) + assert.NotNil(t, fw) + assert.NoError(t, err) + _, err = io.Copy(fw, f) + if err != nil { + t.Errorf("error copying the file: error %v", err) + } + + err = w.Close() + if err != nil { + t.Errorf("error closing the file: error %v", err) + } + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + fs := fileString("uploads_test.go", 0, "application/octet-stream") + + assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +} + +func TestHandler_Upload_NestedFile(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php upload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8021", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + + f := mustOpen("uploads_test.go") + defer func() { + err := f.Close() + if err != nil { + t.Errorf("failed to close a file: error %v", err) + } + }() + fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name()) + assert.NotNil(t, fw) + assert.NoError(t, err) + _, err = io.Copy(fw, f) + if err != nil { + t.Errorf("error copying the file: error %v", err) + } + + err = w.Close() + if err != nil { + t.Errorf("error closing the file: error %v", err) + } + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + fs := fileString("uploads_test.go", 0, "application/octet-stream") + + assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b)) +} + +func TestHandler_Upload_File_NoTmpDir(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: "-----", + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php upload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8021", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + + f := mustOpen("uploads_test.go") + defer func() { + err := f.Close() + if err != nil { + t.Errorf("failed to close a file: error %v", err) + } + }() + fw, err := w.CreateFormFile("upload", f.Name()) + assert.NotNil(t, fw) + assert.NoError(t, err) + _, err = io.Copy(fw, f) + if err != nil { + t.Errorf("error copying the file: error %v", err) + } + + err = w.Close() + if err != nil { + t.Errorf("error closing the file: error %v", err) + } + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + fs := fileString("uploads_test.go", 5, "application/octet-stream") + + assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +} + +func TestHandler_Upload_File_Forbids(t *testing.T) { + h := &Handler{ + cfg: &Config{ + MaxRequestSize: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{".go"}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php upload pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8021", Handler: h} + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + var mb bytes.Buffer + w := multipart.NewWriter(&mb) + + f := mustOpen("uploads_test.go") + defer func() { + err := f.Close() + if err != nil { + t.Errorf("failed to close a file: error %v", err) + } + }() + fw, err := w.CreateFormFile("upload", f.Name()) + assert.NotNil(t, fw) + assert.NoError(t, err) + _, err = io.Copy(fw, f) + if err != nil { + t.Errorf("error copying the file: error %v", err) + } + + err = w.Close() + if err != nil { + t.Errorf("error closing the file: error %v", err) + } + + req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) + assert.NoError(t, err) + + req.Header.Set("Content-Type", w.FormDataContentType()) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + }() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + fs := fileString("uploads_test.go", 7, "application/octet-stream") + + assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +} + +func Test_FileExists(t *testing.T) { + assert.True(t, exists("uploads_test.go")) + assert.False(t, exists("uploads_test.")) +} + +func mustOpen(f string) *os.File { + r, err := os.Open(f) + if err != nil { + panic(err) + } + return r +} + +type fInfo struct { + Name string `json:"name"` + Size int64 `json:"size"` + Mime string `json:"mime"` + Error int `json:"error"` + MD5 string `json:"md5,omitempty"` +} + +func fileString(f string, errNo int, mime string) string { + s, err := os.Stat(f) + if err != nil { + fmt.Println(fmt.Errorf("error stat the file, error: %v", err)) + } + + ff, err := os.Open(f) + if err != nil { + fmt.Println(fmt.Errorf("error opening the file, error: %v", err)) + } + + defer func() { + er := ff.Close() + if er != nil { + fmt.Println(fmt.Errorf("error closing the file, error: %v", er)) + } + }() + + h := md5.New() + _, err = io.Copy(h, ff) + if err != nil { + fmt.Println(fmt.Errorf("error copying the file, error: %v", err)) + } + + v := &fInfo{ + Name: s.Name(), + Size: s.Size(), + Error: errNo, + Mime: mime, + MD5: hex.EncodeToString(h.Sum(nil)), + } + + if errNo != 0 { + v.MD5 = "" + v.Size = 0 + } + + j := json.ConfigCompatibleWithStandardLibrary + r, err := j.Marshal(v) + if err != nil { + fmt.Println(fmt.Errorf("error marshalling fInfo, error: %v", err)) + } + return string(r) + +} |