diff options
-rwxr-xr-x | go.mod | 3 | ||||
-rwxr-xr-x | go.sum | 1 | ||||
-rw-r--r-- | interfaces/server/interface.go | 4 | ||||
-rw-r--r-- | plugins/http/attributes/attributes_test.go | 1 | ||||
-rw-r--r-- | plugins/http/config.go | 4 | ||||
-rw-r--r-- | plugins/http/handler_test.go | 166 | ||||
-rw-r--r-- | plugins/http/plugin.go | 144 | ||||
-rw-r--r-- | plugins/http/plugin_test.go | 1514 | ||||
-rw-r--r-- | plugins/http/response.go | 2 | ||||
-rw-r--r-- | plugins/http/response_test.go | 16 | ||||
-rw-r--r-- | plugins/http/rpc_test.go | 439 | ||||
-rw-r--r-- | plugins/http/ssl_test.go | 505 | ||||
-rw-r--r-- | plugins/http/uploads_test.go | 865 | ||||
-rw-r--r-- | plugins/server/tests/plugin_pipes.go | 4 | ||||
-rw-r--r-- | plugins/server/tests/plugin_sockets.go | 4 | ||||
-rw-r--r-- | plugins/server/tests/plugin_tcp.go | 4 | ||||
-rw-r--r-- | src/Exception/RoadRunnerException.php | 2 | ||||
-rw-r--r-- | src/Http/HttpClient.php | 75 | ||||
-rw-r--r-- | src/Http/PSR7Client.php | 217 | ||||
-rw-r--r-- | src/Logger/.empty | 0 | ||||
-rw-r--r-- | src/Metrics/Metrics.php | 80 | ||||
-rw-r--r-- | src/Metrics/MetricsInterface.php | 64 | ||||
-rw-r--r-- | src/WorkerInterface.php | 0 |
23 files changed, 2263 insertions, 1851 deletions
@@ -3,16 +3,19 @@ module github.com/spiral/roadrunner/v2 go 1.15 require ( + github.com/cenkalti/backoff/v4 v4.1.0 github.com/fatih/color v1.10.0 github.com/hashicorp/go-multierror v1.0.0 github.com/json-iterator/go v1.1.10 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.7.1 github.com/shirou/gopsutil v3.20.10+incompatible + github.com/sirupsen/logrus v1.6.0 github.com/spf13/viper v1.7.1 github.com/spiral/endure v1.0.0-beta19 github.com/spiral/errors v1.0.4 github.com/spiral/goridge/v2 v2.4.6 + github.com/spiral/roadrunner v1.8.4 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a go.uber.org/multierr v1.6.0 @@ -238,6 +238,7 @@ github.com/shirou/gopsutil v3.20.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMT github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= diff --git a/interfaces/server/interface.go b/interfaces/server/interface.go index 51d172cb..2dae30c5 100644 --- a/interfaces/server/interface.go +++ b/interfaces/server/interface.go @@ -9,8 +9,8 @@ import ( type Env map[string]string -// WorkerFactory creates workers for the application. -type WorkerFactory interface { +// Server creates workers for the application. +type Server interface { CmdFactory(env Env) (func() *exec.Cmd, error) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env Env) (roadrunner.Pool, error) diff --git a/plugins/http/attributes/attributes_test.go b/plugins/http/attributes/attributes_test.go index d914f6fa..a4c85eea 100644 --- a/plugins/http/attributes/attributes_test.go +++ b/plugins/http/attributes/attributes_test.go @@ -71,7 +71,6 @@ func TestSetAttribute(t *testing.T) { func TestSetAttributeNone(t *testing.T) { r := &http.Request{} - err := Set(r, "key", "value") if err != nil { t.Errorf("error during the Set: error %v", err) diff --git a/plugins/http/config.go b/plugins/http/config.go index 7922f485..b3f4ca13 100644 --- a/plugins/http/config.go +++ b/plugins/http/config.go @@ -19,7 +19,7 @@ type ServerConfig struct { User string // Relay defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" + // "pipes", "tcp://:6001", "unix://pool.sock" // This config section must not change on re-configuration. Relay string @@ -111,7 +111,7 @@ func (c *Config) EnableHTTP() bool { return c.Address != "" } -// EnableTLS returns true if rr must listen TLS connections. +// EnableTLS returns true if pool must listen TLS connections. func (c *Config) EnableTLS() bool { return c.SSL.Key != "" || c.SSL.Cert != "" || c.SSL.RootCA != "" } diff --git a/plugins/http/handler_test.go b/plugins/http/handler_test.go index d9726e81..d15cf96f 100644 --- a/plugins/http/handler_test.go +++ b/plugins/http/handler_test.go @@ -73,7 +73,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php echo pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -84,8 +84,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8177", Handler: h} // defer func() { @@ -118,7 +118,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php echo pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -145,7 +145,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php echo pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -174,7 +174,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php header pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -185,8 +185,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8078", Handler: h} // defer func() { @@ -237,7 +237,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php user-agent pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -248,8 +248,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8088", Handler: h} // defer func() { @@ -299,7 +299,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php user-agent pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -310,8 +310,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8088", Handler: h} // defer func() { @@ -361,7 +361,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php cookie pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -372,8 +372,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8079", Handler: h} // defer func() { @@ -428,7 +428,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php payload pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -439,8 +439,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8090", Handler: h} // defer func() { @@ -494,7 +494,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php payload pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -505,8 +505,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8081", Handler: h} // defer func() { @@ -555,7 +555,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php payload pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -566,8 +566,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8082", Handler: h} // defer func() { @@ -617,7 +617,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php data pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -628,8 +628,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8083", Handler: h} // defer func() { @@ -691,7 +691,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php data pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -702,8 +702,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8083", Handler: h} // defer func() { @@ -759,7 +759,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php data pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -770,8 +770,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8083", Handler: h} // defer func() { @@ -833,7 +833,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php data pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -844,8 +844,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8084", Handler: h} // defer func() { @@ -907,7 +907,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php data pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -918,8 +918,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8085", Handler: h} // defer func() { @@ -981,7 +981,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php data pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -992,8 +992,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8019", Handler: h} // defer func() { @@ -1097,7 +1097,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php data pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1108,8 +1108,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8020", Handler: h} // defer func() { @@ -1213,7 +1213,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php data pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1224,8 +1224,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8021", Handler: h} // defer func() { @@ -1331,7 +1331,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php error pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1342,8 +1342,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8177", Handler: h} // defer func() { @@ -1375,7 +1375,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php error2 pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1386,8 +1386,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8177", Handler: h} // defer func() { @@ -1419,7 +1419,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php pid pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1430,8 +1430,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8177", Handler: h} // defer func() { @@ -1480,7 +1480,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php echo pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1491,8 +1491,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8177", Handler: h} // defer func() { @@ -1539,7 +1539,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php echoDelay pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1550,8 +1550,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8177", Handler: h} // defer func() { @@ -1598,7 +1598,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php error pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1609,8 +1609,8 @@ package http // }), // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8177", Handler: h} // defer func() { @@ -1665,7 +1665,7 @@ package http // "fe80::/10", // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php ip pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1681,8 +1681,8 @@ package http // t.Errorf("error parsing CIDRs: error %v", err) // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} // defer func() { @@ -1724,7 +1724,7 @@ package http // "fe80::/10", // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php ip pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1740,8 +1740,8 @@ package http // t.Errorf("error parsing CIDRs: error %v", err) // } // -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} // defer func() { @@ -1788,7 +1788,7 @@ package http // "fe80::/10", // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php ip pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1803,8 +1803,8 @@ package http // if err != nil { // t.Errorf("error parsing CIDRs: error %v", err) // } -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} // defer func() { @@ -1851,7 +1851,7 @@ package http // "10.0.0.0/8", // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php ip pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1866,8 +1866,8 @@ package http // if err != nil { // t.Errorf("error parsing CIDRs: error %v", err) // } -// assert.NoError(t, h.rr.Start()) -// defer h.rr.Stop() +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() // // hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} // defer func() { @@ -1903,7 +1903,7 @@ package http // Forbid: []string{}, // }, // }, -// rr: roadrunner.NewServer(&roadrunner.ServerConfig{ +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ // Command: "php ../../tests/http/client.php echo pipes", // Relay: "pipes", // Pool: &roadrunner.Config{ @@ -1914,11 +1914,11 @@ package http // }), // } // -// err := h.rr.Start() +// err := h.pool.Start() // if err != nil { // b.Errorf("error starting the worker pool: error %v", err) // } -// defer h.rr.Stop() +// defer h.pool.Stop() // // hs := &http.Server{Addr: ":8177", Handler: h} // defer func() { diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 94b6c74b..581455b3 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -37,20 +37,20 @@ const ( // http middleware type. type middleware func(f http.HandlerFunc) http.HandlerFunc -// Service manages rr, http servers. +// Service manages pool, http servers. type Plugin struct { sync.Mutex sync.WaitGroup - cfg *Config - log log.Logger + cfg *Config + configurer config.Configurer + log log.Logger - //cprod roadrunner.CommandProducer - env map[string]string - lsns []func(event int, ctx interface{}) - mdwr []middleware + mdwr []middleware + listeners []util.EventListener - rr roadrunner.Pool + pool roadrunner.Pool + server factory.Server //controller roadrunner.Controller handler *Handler @@ -59,35 +59,29 @@ type Plugin struct { fcgi *http.Server } -//// Attach attaches controller. Currently only one controller is supported. -//func (s *Service) Attach(w roadrunner.Controller) { -// s.controller = w -//} -// -//// ProduceCommands changes the default command generator method -//func (s *Service) ProduceCommands(producer roadrunner.CommandProducer) { -// s.cprod = producer -//} - // AddMiddleware adds new net/http mdwr. func (s *Plugin) AddMiddleware(m middleware) { s.mdwr = append(s.mdwr, m) } // AddListener attaches server event controller. -func (s *Plugin) AddListener(l func(event int, ctx interface{})) { - s.lsns = append(s.lsns, l) +func (s *Plugin) AddListener(listener util.EventListener) { + // save listeners for Reset + s.listeners = append(s.listeners, listener) + s.pool.AddListener(listener) } // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.WorkerFactory) error { +func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Server) error { const op = errors.Op("http Init") err := cfg.UnmarshalKey(ServiceName, &s.cfg) if err != nil { return errors.E(op, err) } + s.configurer = cfg + s.listeners = make([]util.EventListener, 0, 1) s.log = log // Set needed env vars @@ -107,7 +101,7 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Work return errors.E(op, err) } - s.rr = p + s.pool = p //if r != nil { // if err := r.Register(ID, &rpcServer{s}); err != nil { @@ -125,6 +119,8 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Work // Serve serves the svc. func (s *Plugin) Serve() chan error { s.Lock() + defer s.Unlock() + const op = errors.Op("serve http") errCh := make(chan error, 2) @@ -137,14 +133,14 @@ func (s *Plugin) Serve() chan error { //s.cfg.Workers.CommandProducer = s.cprod //s.cfg.Workers.SetEnv("RR_HTTP", "true") // - //s.rr = roadrunner.NewServer(s.cfg.Workers) - //s.rr.Listen(s.throw) + //s.pool = roadrunner.NewServer(s.cfg.Workers) + //s.pool.Listen(s.throw) // //if s.controller != nil { - // s.rr.Attach(s.controller) + // s.pool.Attach(s.controller) //} - s.handler = &Handler{cfg: s.cfg, rr: s.rr} + s.handler = &Handler{cfg: s.cfg, rr: s.pool} //s.handler.Listen(s.throw) if s.cfg.EnableHTTP() { @@ -177,12 +173,10 @@ func (s *Plugin) Serve() chan error { s.fcgi = &http.Server{Handler: s} } - s.Unlock() - - //if err := s.rr.Start(); err != nil { + //if err := s.pool.Start(); err != nil { // return err //} - //defer s.rr.Stop() + //defer s.pool.Stop() if s.http != nil { go func() { @@ -260,7 +254,7 @@ func (s *Plugin) Stop() error { return err } -// ServeHTTP handles connection using set of middleware and rr PSR-7 server. +// ServeHTTP handles connection using set of middleware and pool PSR-7 server. func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect { target := &url.URL{ @@ -385,7 +379,6 @@ func (s *Plugin) initSSL() *http.Server { PreferServerCipherSuites: true, }, } - //s.throw(EventInitSSL, server) return server } @@ -419,7 +412,7 @@ func (s *Plugin) serveFCGI() error { // } // // if event == roadrunner.EventServerFailure { -// // underlying rr server is dead +// // underlying pool server is dead // s.Stop() // } //} @@ -436,65 +429,42 @@ func (s *Plugin) tlsAddr(host string, forcePort bool) string { return host } -// Server returns associated rr workers +// Server returns associated pool workers func (s *Plugin) Workers() []roadrunner.WorkerBase { - return s.rr.Workers() + return s.pool.Workers() } func (s *Plugin) Reset() error { + s.Lock() + defer s.Unlock() + s.pool.Destroy(context.Background()) + + // Set needed env vars + env := make(map[string]string) + env["RR_HTTP"] = "true" + var err error + // re-read the config - // destroy the pool - // attach new one + err = s.configurer.UnmarshalKey(ServiceName, &s.cfg) + if err != nil { + return err + } - //s.mup.Lock() - //defer s.mup.Unlock() - // - //s.mu.Lock() - //if !s.started { - // s.cfg = cfg - // s.mu.Unlock() - // return nil - //} - //s.mu.Unlock() - // - //if s.cfg.Differs(cfg) { - // return errors.New("unable to reconfigure server (cmd and pool changes are allowed)") - //} - // - //s.mu.Lock() - //previous := s.pool - //pWatcher := s.pController - //s.mu.Unlock() - // - //pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool) - //if err != nil { - // return err - //} - // - //pool.Listen(s.poolListener) - // - //s.mu.Lock() - //s.cfg.Pool, s.pool = cfg.Pool, pool - // - //if s.controller != nil { - // s.pController = s.controller.Attach(pool) - //} - // - //s.mu.Unlock() - // - //s.throw(EventPoolConstruct, pool) - // - //if previous != nil { - // go func(previous Pool, pWatcher Controller) { - // s.throw(EventPoolDestruct, previous) - // if pWatcher != nil { - // pWatcher.Detach() - // } - // - // previous.Destroy() - // }(previous, pWatcher) - //} - // - //return nil + s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{ + Debug: false, + NumWorkers: 0, + MaxJobs: 0, + AllocateTimeout: 0, + DestroyTimeout: 0, + Supervisor: nil, + }, env) + if err != nil { + return err + } + + // restore original listeners + for i := 0; i < len(s.listeners); i++ { + s.pool.AddListener(s.listeners[i]) + } return nil } diff --git a/plugins/http/plugin_test.go b/plugins/http/plugin_test.go index f7ee33cc..012cea04 100644 --- a/plugins/http/plugin_test.go +++ b/plugins/http/plugin_test.go @@ -1,759 +1,759 @@ package http -import ( - "github.com/cenkalti/backoff/v4" - json "github.com/json-iterator/go" - "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" - "github.com/spiral/roadrunner" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/env" - "github.com/spiral/roadrunner/service/rpc" - "github.com/stretchr/testify/assert" - "io/ioutil" - "net/http" - "os" - "testing" - "time" -) - -type testCfg struct { - httpCfg string - rpcCfg string - envCfg string - target string -} - -func (cfg *testCfg) Get(name string) service.Config { - if name == ID { - if cfg.httpCfg == "" { - return nil - } - - return &testCfg{target: cfg.httpCfg} - } - - if name == rpc.ID { - return &testCfg{target: cfg.rpcCfg} - } - - if name == env.ID { - return &testCfg{target: cfg.envCfg} - } - - return nil -} -func (cfg *testCfg) Unmarshal(out interface{}) error { - j := json.ConfigCompatibleWithStandardLibrary - return j.Unmarshal([]byte(cfg.target), out) -} - -func Test_Service_NoConfig(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{httpCfg: `{"Enable":true}`}) - assert.Error(t, err) - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusInactive, st) -} - -func Test_Service_Configure_Disable(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{})) - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusInactive, st) -} - -func Test_Service_Configure_Enable(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{httpCfg: `{ - "enable": true, - "address": ":8070", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php echo pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`}) - if err != nil { - return err - } - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) - - return nil - }, bkoff) - - if err != nil { - t.Fatal(err) - } - -} - -func Test_Service_Echo(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{httpCfg: `{ - "enable": true, - "address": ":6536", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php echo pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`}) - if err != nil { - return err - } - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) - - // should do nothing - s.(*Service).Stop() - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("serve error: %v", err) - } - }() - - time.Sleep(time.Millisecond * 100) - - req, err := http.NewRequest("GET", "http://localhost:6536?hello=world", nil) - if err != nil { - c.Stop() - return err - } - - r, err := http.DefaultClient.Do(req) - if err != nil { - c.Stop() - return err - } - b, err := ioutil.ReadAll(r.Body) - if err != nil { - c.Stop() - return err - } - assert.Equal(t, 201, r.StatusCode) - assert.Equal(t, "WORLD", string(b)) - - err = r.Body.Close() - if err != nil { - c.Stop() - return err - } - - c.Stop() - return nil - }, bkoff) - - if err != nil { - t.Fatal(err) - } -} - -func Test_Service_Env(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(env.ID, env.NewService(map[string]string{"rr": "test"})) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{httpCfg: `{ - "enable": true, - "address": ":10031", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php env pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`, envCfg: `{"env_key":"ENV_VALUE"}`}) - if err != nil { - return err - } - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) - - // should do nothing - s.(*Service).Stop() - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("serve error: %v", err) - } - }() - - time.Sleep(time.Millisecond * 500) - - req, err := http.NewRequest("GET", "http://localhost:10031", nil) - if err != nil { - c.Stop() - return err - } - - r, err := http.DefaultClient.Do(req) - if err != nil { - c.Stop() - return err - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - c.Stop() - return err - } - - assert.Equal(t, 200, r.StatusCode) - assert.Equal(t, "ENV_VALUE", string(b)) - - err = r.Body.Close() - if err != nil { - c.Stop() - return err - } - - c.Stop() - return nil - }, bkoff) - - if err != nil { - t.Fatal(err) - } - -} - -func Test_Service_ErrorEcho(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{httpCfg: `{ - "enable": true, - "address": ":6030", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php echoerr pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`}) - if err != nil { - return err - } - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) - - goterr := make(chan interface{}) - s.(*Service).AddListener(func(event int, ctx interface{}) { - if event == roadrunner.EventStderrOutput { - if string(ctx.([]byte)) == "WORLD\n" { - goterr <- nil - } - } - }) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("serve error: %v", err) - } - }() - - time.Sleep(time.Millisecond * 500) - - req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil) - if err != nil { - c.Stop() - return err - } - - r, err := http.DefaultClient.Do(req) - if err != nil { - c.Stop() - return err - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - c.Stop() - return err - } - - <-goterr - - assert.Equal(t, 201, r.StatusCode) - assert.Equal(t, "WORLD", string(b)) - err = r.Body.Close() - if err != nil { - c.Stop() - return err - } - - c.Stop() - - return nil - }, bkoff) - - if err != nil { - t.Fatal(err) - } -} - -func Test_Service_Middleware(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{httpCfg: `{ - "enable": true, - "address": ":6032", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php echo pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`}) - if err != nil { - return err - } - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) - - s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/halt" { - w.WriteHeader(500) - _, err := w.Write([]byte("halted")) - if err != nil { - t.Errorf("error writing the data to the http reply: error %v", err) - } - } else { - f(w, r) - } - } - }) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("serve error: %v", err) - } - }() - time.Sleep(time.Millisecond * 500) - - req, err := http.NewRequest("GET", "http://localhost:6032?hello=world", nil) - if err != nil { - c.Stop() - return err - } - - r, err := http.DefaultClient.Do(req) - if err != nil { - c.Stop() - return err - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - c.Stop() - return err - } - - assert.Equal(t, 201, r.StatusCode) - assert.Equal(t, "WORLD", string(b)) - - err = r.Body.Close() - if err != nil { - c.Stop() - return err - } - - req, err = http.NewRequest("GET", "http://localhost:6032/halt", nil) - if err != nil { - c.Stop() - return err - } - - r, err = http.DefaultClient.Do(req) - if err != nil { - c.Stop() - return err - } - b, err = ioutil.ReadAll(r.Body) - if err != nil { - c.Stop() - return err - } - - assert.Equal(t, 500, r.StatusCode) - assert.Equal(t, "halted", string(b)) - - err = r.Body.Close() - if err != nil { - c.Stop() - return err - } - c.Stop() - - return nil - }, bkoff) - - if err != nil { - t.Fatal(err) - } - -} - -func Test_Service_Listener(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{httpCfg: `{ - "enable": true, - "address": ":6033", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php echo pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`}) - if err != nil { - return err - } - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) - - stop := make(chan interface{}) - s.(*Service).AddListener(func(event int, ctx interface{}) { - if event == roadrunner.EventServerStart { - stop <- nil - } - }) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("serve error: %v", err) - } - }() - time.Sleep(time.Millisecond * 500) - - c.Stop() - assert.True(t, true) - - return nil - }, bkoff) - - if err != nil { - t.Fatal(err) - } -} - -func Test_Service_Error(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{httpCfg: `{ - "enable": true, - "address": ":6034", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php echo pipes", - "relay": "---", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`}) - if err != nil { - return err - } - - // assert error - err = c.Serve() - if err == nil { - return err - } - - return nil - }, bkoff) - - if err != nil { - t.Fatal(err) - } -} - -func Test_Service_Error2(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{httpCfg: `{ - "enable": true, - "address": ":6035", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php broken pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`}) - if err != nil { - return err - } - - // assert error - err = c.Serve() - if err == nil { - return err - } - - return nil - }, bkoff) - - if err != nil { - t.Fatal(err) - } -} - -func Test_Service_Error3(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{httpCfg: `{ - "enable": true, - "address": ":6036", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers" - "command": "php ../../tests/http/client.php broken pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`}) - // assert error - if err == nil { - return err - } - - return nil - }, bkoff) - - if err != nil { - t.Fatal(err) - } - -} - -func Test_Service_Error4(t *testing.T) { - bkoff := backoff.NewExponentialBackOff() - bkoff.MaxElapsedTime = time.Second * 15 - - err := backoff.Retry(func() error { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - err := c.Init(&testCfg{httpCfg: `{ - "enable": true, - "address": "----", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php broken pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`}) - // assert error - if err != nil { - return nil - } - - return err - }, bkoff) - - if err != nil { - t.Fatal(err) - } -} - -func tmpDir() string { - p := os.TempDir() - j := json.ConfigCompatibleWithStandardLibrary - r, _ := j.Marshal(p) - - return string(r) -} +//import ( +// "github.com/cenkalti/backoff/v4" +// json "github.com/json-iterator/go" +// "github.com/sirupsen/logrus" +// "github.com/sirupsen/logrus/hooks/test" +// "github.com/spiral/roadrunner" +// "github.com/spiral/roadrunner/service" +// "github.com/spiral/roadrunner/service/env" +// "github.com/spiral/roadrunner/service/rpc" +// "github.com/stretchr/testify/assert" +// "io/ioutil" +// "net/http" +// "os" +// "testing" +// "time" +//) +// +//type testCfg struct { +// httpCfg string +// rpcCfg string +// envCfg string +// target string +//} +// +//func (cfg *testCfg) Get(name string) service.Config { +// if name == ID { +// if cfg.httpCfg == "" { +// return nil +// } +// +// return &testCfg{target: cfg.httpCfg} +// } +// +// if name == rpc.ID { +// return &testCfg{target: cfg.rpcCfg} +// } +// +// if name == env.ID { +// return &testCfg{target: cfg.envCfg} +// } +// +// return nil +//} +//func (cfg *testCfg) Unmarshal(out interface{}) error { +// j := json.ConfigCompatibleWithStandardLibrary +// return j.Unmarshal([]byte(cfg.target), out) +//} +// +//func Test_Service_NoConfig(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{"Enable":true}`}) +// assert.Error(t, err) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusInactive, st) +//} +// +//func Test_Service_Configure_Disable(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusInactive, st) +//} +// +//func Test_Service_Configure_Enable(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":8070", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +// +//} +// +//func Test_Service_Echo(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6536", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("serve error: %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 100) +// +// req, err := http.NewRequest("GET", "http://localhost:6536?hello=world", nil) +// if err != nil { +// c.Stop() +// return err +// } +// +// r, err := http.DefaultClient.Do(req) +// if err != nil { +// c.Stop() +// return err +// } +// b, err := ioutil.ReadAll(r.Body) +// if err != nil { +// c.Stop() +// return err +// } +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// err = r.Body.Close() +// if err != nil { +// c.Stop() +// return err +// } +// +// c.Stop() +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func Test_Service_Env(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(env.ID, env.NewService(map[string]string{"pool": "test"})) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":10031", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php env pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`, envCfg: `{"env_key":"ENV_VALUE"}`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("serve error: %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "http://localhost:10031", nil) +// if err != nil { +// c.Stop() +// return err +// } +// +// r, err := http.DefaultClient.Do(req) +// if err != nil { +// c.Stop() +// return err +// } +// +// b, err := ioutil.ReadAll(r.Body) +// if err != nil { +// c.Stop() +// return err +// } +// +// assert.Equal(t, 200, r.StatusCode) +// assert.Equal(t, "ENV_VALUE", string(b)) +// +// err = r.Body.Close() +// if err != nil { +// c.Stop() +// return err +// } +// +// c.Stop() +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +// +//} +// +//func Test_Service_ErrorEcho(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6030", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echoerr pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// goterr := make(chan interface{}) +// s.(*Service).AddListener(func(event int, ctx interface{}) { +// if event == roadrunner.EventStderrOutput { +// if string(ctx.([]byte)) == "WORLD\n" { +// goterr <- nil +// } +// } +// }) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("serve error: %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil) +// if err != nil { +// c.Stop() +// return err +// } +// +// r, err := http.DefaultClient.Do(req) +// if err != nil { +// c.Stop() +// return err +// } +// +// b, err := ioutil.ReadAll(r.Body) +// if err != nil { +// c.Stop() +// return err +// } +// +// <-goterr +// +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// err = r.Body.Close() +// if err != nil { +// c.Stop() +// return err +// } +// +// c.Stop() +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func Test_Service_Middleware(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6032", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc { +// return func(w http.ResponseWriter, r *http.Request) { +// if r.URL.Path == "/halt" { +// w.WriteHeader(500) +// _, err := w.Write([]byte("halted")) +// if err != nil { +// t.Errorf("error writing the data to the http reply: error %v", err) +// } +// } else { +// f(w, r) +// } +// } +// }) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("serve error: %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "http://localhost:6032?hello=world", nil) +// if err != nil { +// c.Stop() +// return err +// } +// +// r, err := http.DefaultClient.Do(req) +// if err != nil { +// c.Stop() +// return err +// } +// +// b, err := ioutil.ReadAll(r.Body) +// if err != nil { +// c.Stop() +// return err +// } +// +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// err = r.Body.Close() +// if err != nil { +// c.Stop() +// return err +// } +// +// req, err = http.NewRequest("GET", "http://localhost:6032/halt", nil) +// if err != nil { +// c.Stop() +// return err +// } +// +// r, err = http.DefaultClient.Do(req) +// if err != nil { +// c.Stop() +// return err +// } +// b, err = ioutil.ReadAll(r.Body) +// if err != nil { +// c.Stop() +// return err +// } +// +// assert.Equal(t, 500, r.StatusCode) +// assert.Equal(t, "halted", string(b)) +// +// err = r.Body.Close() +// if err != nil { +// c.Stop() +// return err +// } +// c.Stop() +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +// +//} +// +//func Test_Service_Listener(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6033", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// stop := make(chan interface{}) +// s.(*Service).AddListener(func(event int, ctx interface{}) { +// if event == roadrunner.EventServerStart { +// stop <- nil +// } +// }) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("serve error: %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// c.Stop() +// assert.True(t, true) +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func Test_Service_Error(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6034", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "relay": "---", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// // assert error +// err = c.Serve() +// if err == nil { +// return err +// } +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func Test_Service_Error2(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6035", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php broken pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// if err != nil { +// return err +// } +// +// // assert error +// err = c.Serve() +// if err == nil { +// return err +// } +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func Test_Service_Error3(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": ":6036", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers" +// "command": "php ../../tests/http/client.php broken pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// // assert error +// if err == nil { +// return err +// } +// +// return nil +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +// +//} +// +//func Test_Service_Error4(t *testing.T) { +// bkoff := backoff.NewExponentialBackOff() +// bkoff.MaxElapsedTime = time.Second * 15 +// +// err := backoff.Retry(func() error { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// err := c.Init(&testCfg{httpCfg: `{ +// "enable": true, +// "address": "----", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php broken pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`}) +// // assert error +// if err != nil { +// return nil +// } +// +// return err +// }, bkoff) +// +// if err != nil { +// t.Fatal(err) +// } +//} +// +//func tmpDir() string { +// p := os.TempDir() +// j := json.ConfigCompatibleWithStandardLibrary +// r, _ := j.Marshal(p) +// +// return string(r) +//} diff --git a/plugins/http/response.go b/plugins/http/response.go index c3de434f..9ebc0632 100644 --- a/plugins/http/response.go +++ b/plugins/http/response.go @@ -21,7 +21,7 @@ type Response struct { body interface{} } -// NewResponse creates new response based on given rr payload. +// NewResponse creates new response based on given pool payload. func NewResponse(p roadrunner.Payload) (*Response, error) { r := &Response{body: p.Body} j := json.ConfigCompatibleWithStandardLibrary diff --git a/plugins/http/response_test.go b/plugins/http/response_test.go index 1f394276..b5adbad9 100644 --- a/plugins/http/response_test.go +++ b/plugins/http/response_test.go @@ -6,7 +6,7 @@ import ( "net/http" "testing" - "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/v2" "github.com/stretchr/testify/assert" ) @@ -44,13 +44,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error { } func TestNewResponse_Error(t *testing.T) { - r, err := NewResponse(&roadrunner.Payload{Context: []byte(`invalid payload`)}) + r, err := NewResponse(roadrunner.Payload{Context: []byte(`invalid payload`)}) assert.Error(t, err) assert.Nil(t, r) } func TestNewResponse_Write(t *testing.T) { - r, err := NewResponse(&roadrunner.Payload{ + r, err := NewResponse(roadrunner.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), Body: []byte(`sample body`), }) @@ -67,7 +67,7 @@ func TestNewResponse_Write(t *testing.T) { } func TestNewResponse_Stream(t *testing.T) { - r, err := NewResponse(&roadrunner.Payload{ + r, err := NewResponse(roadrunner.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -91,7 +91,7 @@ func TestNewResponse_Stream(t *testing.T) { } func TestNewResponse_StreamError(t *testing.T) { - r, err := NewResponse(&roadrunner.Payload{ + r, err := NewResponse(roadrunner.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -111,7 +111,7 @@ func TestNewResponse_StreamError(t *testing.T) { } func TestWrite_HandlesPush(t *testing.T) { - r, err := NewResponse(&roadrunner.Payload{ + r, err := NewResponse(roadrunner.Payload{ Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`), }) @@ -126,7 +126,7 @@ func TestWrite_HandlesPush(t *testing.T) { } func TestWrite_HandlesTrailers(t *testing.T) { - r, err := NewResponse(&roadrunner.Payload{ + r, err := NewResponse(roadrunner.Payload{ Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`), }) @@ -145,7 +145,7 @@ func TestWrite_HandlesTrailers(t *testing.T) { } func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) { - r, err := NewResponse(&roadrunner.Payload{ + r, err := NewResponse(roadrunner.Payload{ Context: []byte( `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`), }) diff --git a/plugins/http/rpc_test.go b/plugins/http/rpc_test.go index e57a8699..86499d46 100644 --- a/plugins/http/rpc_test.go +++ b/plugins/http/rpc_test.go @@ -1,221 +1,222 @@ package http -import ( - json "github.com/json-iterator/go" - "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/rpc" - "github.com/stretchr/testify/assert" - "os" - "strconv" - "testing" - "time" -) - -func Test_RPC(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(rpc.ID, &rpc.Service{}) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{ - rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`, - httpCfg: `{ - "enable": true, - "address": ":16031", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php pid pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`})) - - s, _ := c.Get(ID) - ss := s.(*Service) - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - - time.Sleep(time.Second) - - res, _, err := get("http://localhost:16031") - if err != nil { - t.Fatal(err) - } - assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res) - - cl, err := rs.Client() - assert.NoError(t, err) - - r := "" - assert.NoError(t, cl.Call("http.Reset", true, &r)) - assert.Equal(t, "OK", r) - - res2, _, err := get("http://localhost:16031") - if err != nil { - t.Fatal(err) - } - assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2) - assert.NotEqual(t, res, res2) - c.Stop() -} - -func Test_RPC_Unix(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(rpc.ID, &rpc.Service{}) - c.Register(ID, &Service{}) - - sock := `unix://` + os.TempDir() + `/rpc.unix` - j := json.ConfigCompatibleWithStandardLibrary - data, _ := j.Marshal(sock) - - assert.NoError(t, c.Init(&testCfg{ - rpcCfg: `{"enable":true, "listen":` + string(data) + `}`, - httpCfg: `{ - "enable": true, - "address": ":6032", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php pid pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`})) - - s, _ := c.Get(ID) - ss := s.(*Service) - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - - time.Sleep(time.Millisecond * 500) - - res, _, err := get("http://localhost:6032") - if err != nil { - c.Stop() - t.Fatal(err) - } - if ss.rr.Workers() != nil && len(ss.rr.Workers()) > 0 { - assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res) - } else { - c.Stop() - t.Fatal("no workers initialized") - } - - cl, err := rs.Client() - if err != nil { - c.Stop() - t.Fatal(err) - } - - r := "" - assert.NoError(t, cl.Call("http.Reset", true, &r)) - assert.Equal(t, "OK", r) - - res2, _, err := get("http://localhost:6032") - if err != nil { - c.Stop() - t.Fatal(err) - } - assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2) - assert.NotEqual(t, res, res2) - c.Stop() -} - -func Test_Workers(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(rpc.ID, &rpc.Service{}) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{ - rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`, - httpCfg: `{ - "enable": true, - "address": ":6033", - "maxRequestSize": 1024, - "uploads": { - "dir": ` + tmpDir() + `, - "forbid": [] - }, - "workers":{ - "command": "php ../../tests/http/client.php pid pipes", - "relay": "pipes", - "pool": { - "numWorkers": 1, - "allocateTimeout": 10000000, - "destroyTimeout": 10000000 - } - } - }`})) - - s, _ := c.Get(ID) - ss := s.(*Service) - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - time.Sleep(time.Millisecond * 500) - - cl, err := rs.Client() - assert.NoError(t, err) - - r := &WorkerList{} - assert.NoError(t, cl.Call("http.Workers", true, &r)) - assert.Len(t, r.Workers, 1) - - assert.Equal(t, *ss.rr.Workers()[0].Pid, r.Workers[0].Pid) - c.Stop() -} - -func Test_Errors(t *testing.T) { - r := &rpcServer{nil} - - assert.Error(t, r.Reset(true, nil)) - assert.Error(t, r.Workers(true, nil)) -} +// +//import ( +// json "github.com/json-iterator/go" +// "github.com/sirupsen/logrus" +// "github.com/sirupsen/logrus/hooks/test" +// "github.com/spiral/roadrunner/service" +// "github.com/spiral/roadrunner/service/rpc" +// "github.com/stretchr/testify/assert" +// "os" +// "strconv" +// "testing" +// "time" +//) +// +//func Test_RPC(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(rpc.ID, &rpc.Service{}) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{ +// rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`, +// httpCfg: `{ +// "enable": true, +// "address": ":16031", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php pid pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`})) +// +// s, _ := c.Get(ID) +// ss := s.(*Service) +// +// s2, _ := c.Get(rpc.ID) +// rs := s2.(*rpc.Service) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// +// time.Sleep(time.Second) +// +// res, _, err := get("http://localhost:16031") +// if err != nil { +// t.Fatal(err) +// } +// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res) +// +// cl, err := rs.Client() +// assert.NoError(t, err) +// +// r := "" +// assert.NoError(t, cl.Call("http.Reset", true, &r)) +// assert.Equal(t, "OK", r) +// +// res2, _, err := get("http://localhost:16031") +// if err != nil { +// t.Fatal(err) +// } +// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res2) +// assert.NotEqual(t, res, res2) +// c.Stop() +//} +// +//func Test_RPC_Unix(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(rpc.ID, &rpc.Service{}) +// c.Register(ID, &Service{}) +// +// sock := `unix://` + os.TempDir() + `/rpc.unix` +// j := json.ConfigCompatibleWithStandardLibrary +// data, _ := j.Marshal(sock) +// +// assert.NoError(t, c.Init(&testCfg{ +// rpcCfg: `{"enable":true, "listen":` + string(data) + `}`, +// httpCfg: `{ +// "enable": true, +// "address": ":6032", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php pid pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`})) +// +// s, _ := c.Get(ID) +// ss := s.(*Service) +// +// s2, _ := c.Get(rpc.ID) +// rs := s2.(*rpc.Service) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 500) +// +// res, _, err := get("http://localhost:6032") +// if err != nil { +// c.Stop() +// t.Fatal(err) +// } +// if ss.pool.Workers() != nil && len(ss.pool.Workers()) > 0 { +// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res) +// } else { +// c.Stop() +// t.Fatal("no workers initialized") +// } +// +// cl, err := rs.Client() +// if err != nil { +// c.Stop() +// t.Fatal(err) +// } +// +// r := "" +// assert.NoError(t, cl.Call("http.Reset", true, &r)) +// assert.Equal(t, "OK", r) +// +// res2, _, err := get("http://localhost:6032") +// if err != nil { +// c.Stop() +// t.Fatal(err) +// } +// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res2) +// assert.NotEqual(t, res, res2) +// c.Stop() +//} +// +//func Test_Workers(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(rpc.ID, &rpc.Service{}) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{ +// rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`, +// httpCfg: `{ +// "enable": true, +// "address": ":6033", +// "maxRequestSize": 1024, +// "uploads": { +// "dir": ` + tmpDir() + `, +// "forbid": [] +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php pid pipes", +// "relay": "pipes", +// "pool": { +// "numWorkers": 1, +// "allocateTimeout": 10000000, +// "destroyTimeout": 10000000 +// } +// } +// }`})) +// +// s, _ := c.Get(ID) +// ss := s.(*Service) +// +// s2, _ := c.Get(rpc.ID) +// rs := s2.(*rpc.Service) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// cl, err := rs.Client() +// assert.NoError(t, err) +// +// r := &WorkerList{} +// assert.NoError(t, cl.Call("http.Workers", true, &r)) +// assert.Len(t, r.Workers, 1) +// +// assert.Equal(t, *ss.pool.Workers()[0].Pid, r.Workers[0].Pid) +// c.Stop() +//} +// +//func Test_Errors(t *testing.T) { +// r := &rpcServer{nil} +// +// assert.Error(t, r.Reset(true, nil)) +// assert.Error(t, r.Workers(true, nil)) +//} diff --git a/plugins/http/ssl_test.go b/plugins/http/ssl_test.go index cf147be9..df09aef5 100644 --- a/plugins/http/ssl_test.go +++ b/plugins/http/ssl_test.go @@ -1,254 +1,255 @@ package http -import ( - "crypto/tls" - "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "io/ioutil" - "net/http" - "testing" - "time" -) - -var sslClient = &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - }, -} - -func Test_SSL_Service_Echo(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ - "address": ":6029", - "ssl": { - "port": 6900, - "key": "fixtures/server.key", - "cert": "fixtures/server.crt" - }, - "workers":{ - "command": "php ../../tests/http/client.php echo pipes", - "pool": {"numWorkers": 1} - } - }`})) - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) - - // should do nothing - s.(*Service).Stop() - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - time.Sleep(time.Millisecond * 500) - - req, err := http.NewRequest("GET", "https://localhost:6900?hello=world", nil) - assert.NoError(t, err) - - r, err := sslClient.Do(req) - assert.NoError(t, err) - - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) - - assert.NoError(t, err) - assert.Equal(t, 201, r.StatusCode) - assert.Equal(t, "WORLD", string(b)) - - err2 := r.Body.Close() - if err2 != nil { - t.Errorf("fail to close the Body: error %v", err2) - } - - c.Stop() -} - -func Test_SSL_Service_NoRedirect(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ - "address": ":6030", - "ssl": { - "port": 6901, - "key": "fixtures/server.key", - "cert": "fixtures/server.crt" - }, - "workers":{ - "command": "php ../../tests/http/client.php echo pipes", - "pool": {"numWorkers": 1} - } - }`})) - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) - - // should do nothing - s.(*Service).Stop() - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - - time.Sleep(time.Millisecond * 500) - - req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil) - assert.NoError(t, err) - - r, err := sslClient.Do(req) - assert.NoError(t, err) - - assert.Nil(t, r.TLS) - - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) - - assert.NoError(t, err) - assert.Equal(t, 201, r.StatusCode) - assert.Equal(t, "WORLD", string(b)) - - err2 := r.Body.Close() - if err2 != nil { - t.Errorf("fail to close the Body: error %v", err2) - } - c.Stop() -} - -func Test_SSL_Service_Redirect(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ - "address": ":6831", - "ssl": { - "port": 6902, - "redirect": true, - "key": "fixtures/server.key", - "cert": "fixtures/server.crt" - }, - "workers":{ - "command": "php ../../tests/http/client.php echo pipes", - "pool": {"numWorkers": 1} - } - }`})) - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) - - // should do nothing - s.(*Service).Stop() - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - - time.Sleep(time.Millisecond * 500) - - req, err := http.NewRequest("GET", "http://localhost:6831?hello=world", nil) - assert.NoError(t, err) - - r, err := sslClient.Do(req) - assert.NoError(t, err) - assert.NotNil(t, r.TLS) - - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) - - assert.NoError(t, err) - assert.Equal(t, 201, r.StatusCode) - assert.Equal(t, "WORLD", string(b)) - - err2 := r.Body.Close() - if err2 != nil { - t.Errorf("fail to close the Body: error %v", err2) - } - c.Stop() -} - -func Test_SSL_Service_Push(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ - "address": ":6032", - "ssl": { - "port": 6903, - "redirect": true, - "key": "fixtures/server.key", - "cert": "fixtures/server.crt" - }, - "workers":{ - "command": "php ../../tests/http/client.php push pipes", - "pool": {"numWorkers": 1} - } - }`})) - - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) - - // should do nothing - s.(*Service).Stop() - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - time.Sleep(time.Millisecond * 500) - - req, err := http.NewRequest("GET", "https://localhost:6903?hello=world", nil) - assert.NoError(t, err) - - r, err := sslClient.Do(req) - assert.NoError(t, err) - - assert.NotNil(t, r.TLS) - - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) - - assert.Equal(t, "", r.Header.Get("Http2-Push")) - - assert.NoError(t, err) - assert.Equal(t, 201, r.StatusCode) - assert.Equal(t, "WORLD", string(b)) - - - err2 := r.Body.Close() - if err2 != nil { - t.Errorf("fail to close the Body: error %v", err2) - } - c.Stop() -} +// +//import ( +// "crypto/tls" +// "github.com/sirupsen/logrus" +// "github.com/sirupsen/logrus/hooks/test" +// "github.com/spiral/roadrunner/service" +// "github.com/stretchr/testify/assert" +// "io/ioutil" +// "net/http" +// "testing" +// "time" +//) +// +//var sslClient = &http.Client{ +// Transport: &http.Transport{ +// TLSClientConfig: &tls.Config{ +// InsecureSkipVerify: true, +// }, +// }, +//} +// +//func Test_SSL_Service_Echo(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{httpCfg: `{ +// "address": ":6029", +// "ssl": { +// "port": 6900, +// "key": "fixtures/server.key", +// "cert": "fixtures/server.crt" +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "pool": {"numWorkers": 1} +// } +// }`})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "https://localhost:6900?hello=world", nil) +// assert.NoError(t, err) +// +// r, err := sslClient.Do(req) +// assert.NoError(t, err) +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// err2 := r.Body.Close() +// if err2 != nil { +// t.Errorf("fail to close the Body: error %v", err2) +// } +// +// c.Stop() +//} +// +//func Test_SSL_Service_NoRedirect(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{httpCfg: `{ +// "address": ":6030", +// "ssl": { +// "port": 6901, +// "key": "fixtures/server.key", +// "cert": "fixtures/server.crt" +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "pool": {"numWorkers": 1} +// } +// }`})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil) +// assert.NoError(t, err) +// +// r, err := sslClient.Do(req) +// assert.NoError(t, err) +// +// assert.Nil(t, r.TLS) +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// err2 := r.Body.Close() +// if err2 != nil { +// t.Errorf("fail to close the Body: error %v", err2) +// } +// c.Stop() +//} +// +//func Test_SSL_Service_Redirect(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{httpCfg: `{ +// "address": ":6831", +// "ssl": { +// "port": 6902, +// "redirect": true, +// "key": "fixtures/server.key", +// "cert": "fixtures/server.crt" +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php echo pipes", +// "pool": {"numWorkers": 1} +// } +// }`})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "http://localhost:6831?hello=world", nil) +// assert.NoError(t, err) +// +// r, err := sslClient.Do(req) +// assert.NoError(t, err) +// assert.NotNil(t, r.TLS) +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// err2 := r.Body.Close() +// if err2 != nil { +// t.Errorf("fail to close the Body: error %v", err2) +// } +// c.Stop() +//} +// +//func Test_SSL_Service_Push(t *testing.T) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(ID, &Service{}) +// +// assert.NoError(t, c.Init(&testCfg{httpCfg: `{ +// "address": ":6032", +// "ssl": { +// "port": 6903, +// "redirect": true, +// "key": "fixtures/server.key", +// "cert": "fixtures/server.crt" +// }, +// "workers":{ +// "command": "php ../../tests/http/client.php push pipes", +// "pool": {"numWorkers": 1} +// } +// }`})) +// +// s, st := c.Get(ID) +// assert.NotNil(t, s) +// assert.Equal(t, service.StatusOK, st) +// +// // should do nothing +// s.(*Service).Stop() +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 500) +// +// req, err := http.NewRequest("GET", "https://localhost:6903?hello=world", nil) +// assert.NoError(t, err) +// +// r, err := sslClient.Do(req) +// assert.NoError(t, err) +// +// assert.NotNil(t, r.TLS) +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.Equal(t, "", r.Header.Get("Http2-Push")) +// +// assert.NoError(t, err) +// assert.Equal(t, 201, r.StatusCode) +// assert.Equal(t, "WORLD", string(b)) +// +// +// err2 := r.Body.Close() +// if err2 != nil { +// t.Errorf("fail to close the Body: error %v", err2) +// } +// c.Stop() +//} diff --git a/plugins/http/uploads_test.go b/plugins/http/uploads_test.go index 16365f39..b023b28f 100644 --- a/plugins/http/uploads_test.go +++ b/plugins/http/uploads_test.go @@ -1,434 +1,435 @@ package http -import ( - "bytes" - "context" - "crypto/md5" - "encoding/hex" - "fmt" - "io" - "io/ioutil" - "mime/multipart" - "net/http" - "os" - "testing" - "time" - - json "github.com/json-iterator/go" - "github.com/stretchr/testify/assert" -) - -func TestHandler_Upload_File(t *testing.T) { - h := &Handler{ - cfg: &Config{ - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{}, - }, - }, - rr: roadrunner.NewServer(&roadrunner.ServerConfig{ - Command: "php ../../tests/http/client.php upload pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: 10000000, - DestroyTimeout: 10000000, - }, - }), - } - - assert.NoError(t, h.rr.Start()) - defer h.rr.Stop() - - hs := &http.Server{Addr: ":8021", Handler: h} - defer func() { - err := hs.Shutdown(context.Background()) - if err != nil { - t.Errorf("error during the shutdown: error %v", err) - } - }() - - go func() { - err := hs.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - t.Errorf("error listening the interface: error %v", err) - } - }() - time.Sleep(time.Millisecond * 10) - - var mb bytes.Buffer - w := multipart.NewWriter(&mb) - - f := mustOpen("uploads_test.go") - defer func() { - err := f.Close() - if err != nil { - t.Errorf("failed to close a file: error %v", err) - } - }() - fw, err := w.CreateFormFile("upload", f.Name()) - assert.NotNil(t, fw) - assert.NoError(t, err) - _, err = io.Copy(fw, f) - if err != nil { - t.Errorf("error copying the file: error %v", err) - } - - err = w.Close() - if err != nil { - t.Errorf("error closing the file: error %v", err) - } - - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) - assert.NoError(t, err) - - req.Header.Set("Content-Type", w.FormDataContentType()) - - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - defer func() { - err := r.Body.Close() - if err != nil { - t.Errorf("error closing the Body: error %v", err) - } - }() - - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) - - assert.NoError(t, err) - assert.Equal(t, 200, r.StatusCode) - - fs := fileString("uploads_test.go", 0, "application/octet-stream") - - assert.Equal(t, `{"upload":`+fs+`}`, string(b)) -} - -func TestHandler_Upload_NestedFile(t *testing.T) { - h := &Handler{ - cfg: &Config{ - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{}, - }, - }, - rr: roadrunner.NewServer(&roadrunner.ServerConfig{ - Command: "php ../../tests/http/client.php upload pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: 10000000, - DestroyTimeout: 10000000, - }, - }), - } - - assert.NoError(t, h.rr.Start()) - defer h.rr.Stop() - - hs := &http.Server{Addr: ":8021", Handler: h} - defer func() { - err := hs.Shutdown(context.Background()) - if err != nil { - t.Errorf("error during the shutdown: error %v", err) - } - }() - - go func() { - err := hs.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - t.Errorf("error listening the interface: error %v", err) - } - }() - time.Sleep(time.Millisecond * 10) - - var mb bytes.Buffer - w := multipart.NewWriter(&mb) - - f := mustOpen("uploads_test.go") - defer func() { - err := f.Close() - if err != nil { - t.Errorf("failed to close a file: error %v", err) - } - }() - fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name()) - assert.NotNil(t, fw) - assert.NoError(t, err) - _, err = io.Copy(fw, f) - if err != nil { - t.Errorf("error copying the file: error %v", err) - } - - err = w.Close() - if err != nil { - t.Errorf("error closing the file: error %v", err) - } - - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) - assert.NoError(t, err) - - req.Header.Set("Content-Type", w.FormDataContentType()) - - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - defer func() { - err := r.Body.Close() - if err != nil { - t.Errorf("error closing the Body: error %v", err) - } - }() - - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) - - assert.NoError(t, err) - assert.Equal(t, 200, r.StatusCode) - - fs := fileString("uploads_test.go", 0, "application/octet-stream") - - assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b)) -} - -func TestHandler_Upload_File_NoTmpDir(t *testing.T) { - h := &Handler{ - cfg: &Config{ - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: "-----", - Forbid: []string{}, - }, - }, - rr: roadrunner.NewServer(&roadrunner.ServerConfig{ - Command: "php ../../tests/http/client.php upload pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: 10000000, - DestroyTimeout: 10000000, - }, - }), - } - - assert.NoError(t, h.rr.Start()) - defer h.rr.Stop() - - hs := &http.Server{Addr: ":8021", Handler: h} - defer func() { - err := hs.Shutdown(context.Background()) - if err != nil { - t.Errorf("error during the shutdown: error %v", err) - } - }() - - go func() { - err := hs.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - t.Errorf("error listening the interface: error %v", err) - } - }() - time.Sleep(time.Millisecond * 10) - - var mb bytes.Buffer - w := multipart.NewWriter(&mb) - - f := mustOpen("uploads_test.go") - defer func() { - err := f.Close() - if err != nil { - t.Errorf("failed to close a file: error %v", err) - } - }() - fw, err := w.CreateFormFile("upload", f.Name()) - assert.NotNil(t, fw) - assert.NoError(t, err) - _, err = io.Copy(fw, f) - if err != nil { - t.Errorf("error copying the file: error %v", err) - } - - err = w.Close() - if err != nil { - t.Errorf("error closing the file: error %v", err) - } - - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) - assert.NoError(t, err) - - req.Header.Set("Content-Type", w.FormDataContentType()) - - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - defer func() { - err := r.Body.Close() - if err != nil { - t.Errorf("error closing the Body: error %v", err) - } - }() - - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) - - assert.NoError(t, err) - assert.Equal(t, 200, r.StatusCode) - - fs := fileString("uploads_test.go", 5, "application/octet-stream") - - assert.Equal(t, `{"upload":`+fs+`}`, string(b)) -} - -func TestHandler_Upload_File_Forbids(t *testing.T) { - h := &Handler{ - cfg: &Config{ - MaxRequestSize: 1024, - Uploads: &UploadsConfig{ - Dir: os.TempDir(), - Forbid: []string{".go"}, - }, - }, - rr: roadrunner.NewServer(&roadrunner.ServerConfig{ - Command: "php ../../tests/http/client.php upload pipes", - Relay: "pipes", - Pool: &roadrunner.Config{ - NumWorkers: 1, - AllocateTimeout: 10000000, - DestroyTimeout: 10000000, - }, - }), - } - - assert.NoError(t, h.rr.Start()) - defer h.rr.Stop() - - hs := &http.Server{Addr: ":8021", Handler: h} - defer func() { - err := hs.Shutdown(context.Background()) - if err != nil { - t.Errorf("error during the shutdown: error %v", err) - } - }() - - go func() { - err := hs.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - t.Errorf("error listening the interface: error %v", err) - } - }() - time.Sleep(time.Millisecond * 10) - - var mb bytes.Buffer - w := multipart.NewWriter(&mb) - - f := mustOpen("uploads_test.go") - defer func() { - err := f.Close() - if err != nil { - t.Errorf("failed to close a file: error %v", err) - } - }() - fw, err := w.CreateFormFile("upload", f.Name()) - assert.NotNil(t, fw) - assert.NoError(t, err) - _, err = io.Copy(fw, f) - if err != nil { - t.Errorf("error copying the file: error %v", err) - } - - err = w.Close() - if err != nil { - t.Errorf("error closing the file: error %v", err) - } - - req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) - assert.NoError(t, err) - - req.Header.Set("Content-Type", w.FormDataContentType()) - - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - defer func() { - err := r.Body.Close() - if err != nil { - t.Errorf("error closing the Body: error %v", err) - } - }() - - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) - - assert.NoError(t, err) - assert.Equal(t, 200, r.StatusCode) - - fs := fileString("uploads_test.go", 7, "application/octet-stream") - - assert.Equal(t, `{"upload":`+fs+`}`, string(b)) -} - -func Test_FileExists(t *testing.T) { - assert.True(t, exists("uploads_test.go")) - assert.False(t, exists("uploads_test.")) -} - -func mustOpen(f string) *os.File { - r, err := os.Open(f) - if err != nil { - panic(err) - } - return r -} - -type fInfo struct { - Name string `json:"name"` - Size int64 `json:"size"` - Mime string `json:"mime"` - Error int `json:"error"` - MD5 string `json:"md5,omitempty"` -} - -func fileString(f string, errNo int, mime string) string { - s, err := os.Stat(f) - if err != nil { - fmt.Println(fmt.Errorf("error stat the file, error: %v", err)) - } - - ff, err := os.Open(f) - if err != nil { - fmt.Println(fmt.Errorf("error opening the file, error: %v", err)) - } - - defer func() { - er := ff.Close() - if er != nil { - fmt.Println(fmt.Errorf("error closing the file, error: %v", er)) - } - }() - - h := md5.New() - _, err = io.Copy(h, ff) - if err != nil { - fmt.Println(fmt.Errorf("error copying the file, error: %v", err)) - } - - v := &fInfo{ - Name: s.Name(), - Size: s.Size(), - Error: errNo, - Mime: mime, - MD5: hex.EncodeToString(h.Sum(nil)), - } - - if errNo != 0 { - v.MD5 = "" - v.Size = 0 - } - - j := json.ConfigCompatibleWithStandardLibrary - r, err := j.Marshal(v) - if err != nil { - fmt.Println(fmt.Errorf("error marshalling fInfo, error: %v", err)) - } - return string(r) - -} +// +//import ( +// "bytes" +// "context" +// "crypto/md5" +// "encoding/hex" +// "fmt" +// "io" +// "io/ioutil" +// "mime/multipart" +// "net/http" +// "os" +// "testing" +// "time" +// +// json "github.com/json-iterator/go" +// "github.com/stretchr/testify/assert" +//) +// +//func TestHandler_Upload_File(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php upload pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8021", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// var mb bytes.Buffer +// w := multipart.NewWriter(&mb) +// +// f := mustOpen("uploads_test.go") +// defer func() { +// err := f.Close() +// if err != nil { +// t.Errorf("failed to close a file: error %v", err) +// } +// }() +// fw, err := w.CreateFormFile("upload", f.Name()) +// assert.NotNil(t, fw) +// assert.NoError(t, err) +// _, err = io.Copy(fw, f) +// if err != nil { +// t.Errorf("error copying the file: error %v", err) +// } +// +// err = w.Close() +// if err != nil { +// t.Errorf("error closing the file: error %v", err) +// } +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) +// assert.NoError(t, err) +// +// req.Header.Set("Content-Type", w.FormDataContentType()) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error closing the Body: error %v", err) +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// fs := fileString("uploads_test.go", 0, "application/octet-stream") +// +// assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +//} +// +//func TestHandler_Upload_NestedFile(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php upload pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8021", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// var mb bytes.Buffer +// w := multipart.NewWriter(&mb) +// +// f := mustOpen("uploads_test.go") +// defer func() { +// err := f.Close() +// if err != nil { +// t.Errorf("failed to close a file: error %v", err) +// } +// }() +// fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name()) +// assert.NotNil(t, fw) +// assert.NoError(t, err) +// _, err = io.Copy(fw, f) +// if err != nil { +// t.Errorf("error copying the file: error %v", err) +// } +// +// err = w.Close() +// if err != nil { +// t.Errorf("error closing the file: error %v", err) +// } +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) +// assert.NoError(t, err) +// +// req.Header.Set("Content-Type", w.FormDataContentType()) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error closing the Body: error %v", err) +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// fs := fileString("uploads_test.go", 0, "application/octet-stream") +// +// assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b)) +//} +// +//func TestHandler_Upload_File_NoTmpDir(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: "-----", +// Forbid: []string{}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php upload pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8021", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// var mb bytes.Buffer +// w := multipart.NewWriter(&mb) +// +// f := mustOpen("uploads_test.go") +// defer func() { +// err := f.Close() +// if err != nil { +// t.Errorf("failed to close a file: error %v", err) +// } +// }() +// fw, err := w.CreateFormFile("upload", f.Name()) +// assert.NotNil(t, fw) +// assert.NoError(t, err) +// _, err = io.Copy(fw, f) +// if err != nil { +// t.Errorf("error copying the file: error %v", err) +// } +// +// err = w.Close() +// if err != nil { +// t.Errorf("error closing the file: error %v", err) +// } +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) +// assert.NoError(t, err) +// +// req.Header.Set("Content-Type", w.FormDataContentType()) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error closing the Body: error %v", err) +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// fs := fileString("uploads_test.go", 5, "application/octet-stream") +// +// assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +//} +// +//func TestHandler_Upload_File_Forbids(t *testing.T) { +// h := &Handler{ +// cfg: &Config{ +// MaxRequestSize: 1024, +// Uploads: &UploadsConfig{ +// Dir: os.TempDir(), +// Forbid: []string{".go"}, +// }, +// }, +// pool: roadrunner.NewServer(&roadrunner.ServerConfig{ +// Command: "php ../../tests/http/client.php upload pipes", +// Relay: "pipes", +// Pool: &roadrunner.Config{ +// NumWorkers: 1, +// AllocateTimeout: 10000000, +// DestroyTimeout: 10000000, +// }, +// }), +// } +// +// assert.NoError(t, h.pool.Start()) +// defer h.pool.Stop() +// +// hs := &http.Server{Addr: ":8021", Handler: h} +// defer func() { +// err := hs.Shutdown(context.Background()) +// if err != nil { +// t.Errorf("error during the shutdown: error %v", err) +// } +// }() +// +// go func() { +// err := hs.ListenAndServe() +// if err != nil && err != http.ErrServerClosed { +// t.Errorf("error listening the interface: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 10) +// +// var mb bytes.Buffer +// w := multipart.NewWriter(&mb) +// +// f := mustOpen("uploads_test.go") +// defer func() { +// err := f.Close() +// if err != nil { +// t.Errorf("failed to close a file: error %v", err) +// } +// }() +// fw, err := w.CreateFormFile("upload", f.Name()) +// assert.NotNil(t, fw) +// assert.NoError(t, err) +// _, err = io.Copy(fw, f) +// if err != nil { +// t.Errorf("error copying the file: error %v", err) +// } +// +// err = w.Close() +// if err != nil { +// t.Errorf("error closing the file: error %v", err) +// } +// +// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) +// assert.NoError(t, err) +// +// req.Header.Set("Content-Type", w.FormDataContentType()) +// +// r, err := http.DefaultClient.Do(req) +// assert.NoError(t, err) +// defer func() { +// err := r.Body.Close() +// if err != nil { +// t.Errorf("error closing the Body: error %v", err) +// } +// }() +// +// b, err := ioutil.ReadAll(r.Body) +// assert.NoError(t, err) +// +// assert.NoError(t, err) +// assert.Equal(t, 200, r.StatusCode) +// +// fs := fileString("uploads_test.go", 7, "application/octet-stream") +// +// assert.Equal(t, `{"upload":`+fs+`}`, string(b)) +//} +// +//func Test_FileExists(t *testing.T) { +// assert.True(t, exists("uploads_test.go")) +// assert.False(t, exists("uploads_test.")) +//} +// +//func mustOpen(f string) *os.File { +// r, err := os.Open(f) +// if err != nil { +// panic(err) +// } +// return r +//} +// +//type fInfo struct { +// Name string `json:"name"` +// Size int64 `json:"size"` +// Mime string `json:"mime"` +// Error int `json:"error"` +// MD5 string `json:"md5,omitempty"` +//} +// +//func fileString(f string, errNo int, mime string) string { +// s, err := os.Stat(f) +// if err != nil { +// fmt.Println(fmt.Errorf("error stat the file, error: %v", err)) +// } +// +// ff, err := os.Open(f) +// if err != nil { +// fmt.Println(fmt.Errorf("error opening the file, error: %v", err)) +// } +// +// defer func() { +// er := ff.Close() +// if er != nil { +// fmt.Println(fmt.Errorf("error closing the file, error: %v", er)) +// } +// }() +// +// h := md5.New() +// _, err = io.Copy(h, ff) +// if err != nil { +// fmt.Println(fmt.Errorf("error copying the file, error: %v", err)) +// } +// +// v := &fInfo{ +// Name: s.Name(), +// Size: s.Size(), +// Error: errNo, +// Mime: mime, +// MD5: hex.EncodeToString(h.Sum(nil)), +// } +// +// if errNo != 0 { +// v.MD5 = "" +// v.Size = 0 +// } +// +// j := json.ConfigCompatibleWithStandardLibrary +// r, err := j.Marshal(v) +// if err != nil { +// fmt.Println(fmt.Errorf("error marshalling fInfo, error: %v", err)) +// } +// return string(r) +// +//} diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go index 840021eb..4d31138e 100644 --- a/plugins/server/tests/plugin_pipes.go +++ b/plugins/server/tests/plugin_pipes.go @@ -30,11 +30,11 @@ var testPoolConfig = roadrunner.PoolConfig{ type Foo struct { configProvider config.Configurer - wf server.WorkerFactory + wf server.Server pool roadrunner.Pool } -func (f *Foo) Init(p config.Configurer, workerFactory server.WorkerFactory) error { +func (f *Foo) Init(p config.Configurer, workerFactory server.Server) error { f.configProvider = p f.wf = workerFactory return nil diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go index b12f4ead..4942d4c5 100644 --- a/plugins/server/tests/plugin_sockets.go +++ b/plugins/server/tests/plugin_sockets.go @@ -12,11 +12,11 @@ import ( type Foo2 struct { configProvider config.Configurer - wf server.WorkerFactory + wf server.Server pool roadrunner.Pool } -func (f *Foo2) Init(p config.Configurer, workerFactory server.WorkerFactory) error { +func (f *Foo2) Init(p config.Configurer, workerFactory server.Server) error { f.configProvider = p f.wf = workerFactory return nil diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go index 39044577..89757a02 100644 --- a/plugins/server/tests/plugin_tcp.go +++ b/plugins/server/tests/plugin_tcp.go @@ -12,11 +12,11 @@ import ( type Foo3 struct { configProvider config.Configurer - wf server.WorkerFactory + wf server.Server pool roadrunner.Pool } -func (f *Foo3) Init(p config.Configurer, workerFactory server.WorkerFactory) error { +func (f *Foo3) Init(p config.Configurer, workerFactory server.Server) error { f.configProvider = p f.wf = workerFactory return nil diff --git a/src/Exception/RoadRunnerException.php b/src/Exception/RoadRunnerException.php index f83c3dd4..cd657502 100644 --- a/src/Exception/RoadRunnerException.php +++ b/src/Exception/RoadRunnerException.php @@ -9,6 +9,6 @@ declare(strict_types=1); namespace Spiral\RoadRunner\Exception; -class RoadRunnerException extends \Spiral\RoadRunner\Exceptions\RoadRunnerException +class RoadRunnerException extends \RuntimeException { } diff --git a/src/Http/HttpClient.php b/src/Http/HttpClient.php new file mode 100644 index 00000000..4ca152c8 --- /dev/null +++ b/src/Http/HttpClient.php @@ -0,0 +1,75 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Alex Bond + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +final class HttpClient +{ + /** @var Worker */ + private $worker; + + /** + * @param Worker $worker + */ + public function __construct(Worker $worker) + { + $this->worker = $worker; + } + + /** + * @return Worker + */ + public function getWorker(): Worker + { + return $this->worker; + } + + /** + * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string] + * or null if termination request or invalid context. + */ + public function acceptRequest(): ?array + { + $body = $this->getWorker()->receive($ctx); + if (empty($body) && empty($ctx)) { + // termination request + return null; + } + + $ctx = json_decode($ctx, true); + if ($ctx === null) { + // invalid context + return null; + } + + return ['ctx' => $ctx, 'body' => $body]; + } + + /** + * Send response to the application server. + * + * @param int $status Http status code + * @param string $body Body of response + * @param string[][] $headers An associative array of the message's headers. Each + * key MUST be a header name, and each value MUST be an array of strings + * for that header. + */ + public function respond(int $status, string $body, array $headers = []): void + { + if (empty($headers)) { + // this is required to represent empty header set as map and not as array + $headers = new \stdClass(); + } + + $this->getWorker()->send( + $body, + (string) json_encode(['status' => $status, 'headers' => $headers]) + ); + } +} diff --git a/src/Http/PSR7Client.php b/src/Http/PSR7Client.php new file mode 100644 index 00000000..777dd891 --- /dev/null +++ b/src/Http/PSR7Client.php @@ -0,0 +1,217 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestFactoryInterface; +use Psr\Http\Message\ServerRequestInterface; +use Psr\Http\Message\StreamFactoryInterface; +use Psr\Http\Message\UploadedFileFactoryInterface; +use Psr\Http\Message\UploadedFileInterface; + +/** + * Manages PSR-7 request and response. + */ +class PSR7Client +{ + /** @var HttpClient */ + private $httpClient; + + /** @var ServerRequestFactoryInterface */ + private $requestFactory; + + /** @var StreamFactoryInterface */ + private $streamFactory; + + /** @var UploadedFileFactoryInterface */ + private $uploadsFactory; + + /** @var mixed[] */ + private $originalServer = []; + + /** @var string[] Valid values for HTTP protocol version */ + private static $allowedVersions = ['1.0', '1.1', '2',]; + + /** + * @param Worker $worker + * @param ServerRequestFactoryInterface|null $requestFactory + * @param StreamFactoryInterface|null $streamFactory + * @param UploadedFileFactoryInterface|null $uploadsFactory + */ + public function __construct( + Worker $worker, + ServerRequestFactoryInterface $requestFactory = null, + StreamFactoryInterface $streamFactory = null, + UploadedFileFactoryInterface $uploadsFactory = null + ) { + $this->httpClient = new HttpClient($worker); + $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory(); + $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory(); + $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory(); + $this->originalServer = $_SERVER; + } + + /** + * @return Worker + */ + public function getWorker(): Worker + { + return $this->httpClient->getWorker(); + } + + /** + * @return ServerRequestInterface|null + */ + public function acceptRequest(): ?ServerRequestInterface + { + $rawRequest = $this->httpClient->acceptRequest(); + if ($rawRequest === null) { + return null; + } + + $_SERVER = $this->configureServer($rawRequest['ctx']); + + $request = $this->requestFactory->createServerRequest( + $rawRequest['ctx']['method'], + $rawRequest['ctx']['uri'], + $_SERVER + ); + + parse_str($rawRequest['ctx']['rawQuery'], $query); + + $request = $request + ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol'])) + ->withCookieParams($rawRequest['ctx']['cookies']) + ->withQueryParams($query) + ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads'])); + + foreach ($rawRequest['ctx']['attributes'] as $name => $value) { + $request = $request->withAttribute($name, $value); + } + + foreach ($rawRequest['ctx']['headers'] as $name => $value) { + $request = $request->withHeader($name, $value); + } + + if ($rawRequest['ctx']['parsed']) { + return $request->withParsedBody(json_decode($rawRequest['body'], true)); + } + + if ($rawRequest['body'] !== null) { + return $request->withBody($this->streamFactory->createStream($rawRequest['body'])); + } + + return $request; + } + + /** + * Send response to the application server. + * + * @param ResponseInterface $response + */ + public function respond(ResponseInterface $response): void + { + $this->httpClient->respond( + $response->getStatusCode(), + $response->getBody()->__toString(), + $response->getHeaders() + ); + } + + /** + * Returns altered copy of _SERVER variable. Sets ip-address, + * request-time and other values. + * + * @param mixed[] $ctx + * @return mixed[] + */ + protected function configureServer(array $ctx): array + { + $server = $this->originalServer; + + $server['REQUEST_URI'] = $ctx['uri']; + $server['REQUEST_TIME'] = time(); + $server['REQUEST_TIME_FLOAT'] = microtime(true); + $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1'; + $server['REQUEST_METHOD'] = $ctx['method']; + + $server['HTTP_USER_AGENT'] = ''; + foreach ($ctx['headers'] as $key => $value) { + $key = strtoupper(str_replace('-', '_', $key)); + if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) { + $server[$key] = implode(', ', $value); + } else { + $server['HTTP_' . $key] = implode(', ', $value); + } + } + + return $server; + } + + /** + * Wraps all uploaded files with UploadedFile. + * + * @param array[] $files + * + * @return UploadedFileInterface[]|mixed[] + */ + private function wrapUploads($files): array + { + if (empty($files)) { + return []; + } + + $result = []; + foreach ($files as $index => $f) { + if (!isset($f['name'])) { + $result[$index] = $this->wrapUploads($f); + continue; + } + + if (UPLOAD_ERR_OK === $f['error']) { + $stream = $this->streamFactory->createStreamFromFile($f['tmpName']); + } else { + $stream = $this->streamFactory->createStream(); + } + + $result[$index] = $this->uploadsFactory->createUploadedFile( + $stream, + $f['size'], + $f['error'], + $f['name'], + $f['mime'] + ); + } + + return $result; + } + + /** + * Normalize HTTP protocol version to valid values + * + * @param string $version + * @return string + */ + private static function fetchProtocolVersion(string $version): string + { + $v = substr($version, 5); + + if ($v === '2.0') { + return '2'; + } + + // Fallback for values outside of valid protocol versions + if (!in_array($v, static::$allowedVersions, true)) { + return '1.1'; + } + + return $v; + } +} diff --git a/src/Logger/.empty b/src/Logger/.empty new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/Logger/.empty diff --git a/src/Metrics/Metrics.php b/src/Metrics/Metrics.php new file mode 100644 index 00000000..d6b6e1da --- /dev/null +++ b/src/Metrics/Metrics.php @@ -0,0 +1,80 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exceptions\RPCException; +use Spiral\Goridge\RPC; +use Spiral\RoadRunner\Exception\MetricException; + +/** + * Application metrics. + */ +final class Metrics implements MetricsInterface +{ + /** @var RPC */ + private $rpc; + + /** + * @param RPC $rpc + */ + public function __construct(RPC $rpc) + { + $this->rpc = $rpc; + } + + /** + * @inheritDoc + */ + public function add(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Add', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function sub(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function observe(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function set(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Set', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } +} diff --git a/src/Metrics/MetricsInterface.php b/src/Metrics/MetricsInterface.php new file mode 100644 index 00000000..ec2009b0 --- /dev/null +++ b/src/Metrics/MetricsInterface.php @@ -0,0 +1,64 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\RoadRunner\Exception\MetricException; + +interface MetricsInterface +{ + /** + * Add collector value. Fallback to appropriate method of related collector. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function add(string $collector, float $value, array $labels = []); + + /** + * Subtract the collector value, only for gauge collector. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function sub(string $collector, float $value, array $labels = []); + + /** + * Observe collector value, only for histogram and summary collectors. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function observe(string $collector, float $value, array $labels = []); + + /** + * Set collector value, only for gauge collector. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function set(string $collector, float $value, array $labels = []); +} diff --git a/src/WorkerInterface.php b/src/WorkerInterface.php new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/WorkerInterface.php |