summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xgo.mod3
-rwxr-xr-xgo.sum1
-rw-r--r--interfaces/server/interface.go4
-rw-r--r--plugins/http/attributes/attributes_test.go1
-rw-r--r--plugins/http/config.go4
-rw-r--r--plugins/http/handler_test.go166
-rw-r--r--plugins/http/plugin.go144
-rw-r--r--plugins/http/plugin_test.go1514
-rw-r--r--plugins/http/response.go2
-rw-r--r--plugins/http/response_test.go16
-rw-r--r--plugins/http/rpc_test.go439
-rw-r--r--plugins/http/ssl_test.go505
-rw-r--r--plugins/http/uploads_test.go865
-rw-r--r--plugins/server/tests/plugin_pipes.go4
-rw-r--r--plugins/server/tests/plugin_sockets.go4
-rw-r--r--plugins/server/tests/plugin_tcp.go4
-rw-r--r--src/Exception/RoadRunnerException.php2
-rw-r--r--src/Http/HttpClient.php75
-rw-r--r--src/Http/PSR7Client.php217
-rw-r--r--src/Logger/.empty0
-rw-r--r--src/Metrics/Metrics.php80
-rw-r--r--src/Metrics/MetricsInterface.php64
-rw-r--r--src/WorkerInterface.php0
23 files changed, 2263 insertions, 1851 deletions
diff --git a/go.mod b/go.mod
index 0cbbfc70..5cd5e540 100755
--- a/go.mod
+++ b/go.mod
@@ -3,16 +3,19 @@ module github.com/spiral/roadrunner/v2
go 1.15
require (
+ github.com/cenkalti/backoff/v4 v4.1.0
github.com/fatih/color v1.10.0
github.com/hashicorp/go-multierror v1.0.0
github.com/json-iterator/go v1.1.10
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/shirou/gopsutil v3.20.10+incompatible
+ github.com/sirupsen/logrus v1.6.0
github.com/spf13/viper v1.7.1
github.com/spiral/endure v1.0.0-beta19
github.com/spiral/errors v1.0.4
github.com/spiral/goridge/v2 v2.4.6
+ github.com/spiral/roadrunner v1.8.4
github.com/stretchr/testify v1.6.1
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
go.uber.org/multierr v1.6.0
diff --git a/go.sum b/go.sum
index dcec5726..98942460 100755
--- a/go.sum
+++ b/go.sum
@@ -238,6 +238,7 @@ github.com/shirou/gopsutil v3.20.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMT
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
diff --git a/interfaces/server/interface.go b/interfaces/server/interface.go
index 51d172cb..2dae30c5 100644
--- a/interfaces/server/interface.go
+++ b/interfaces/server/interface.go
@@ -9,8 +9,8 @@ import (
type Env map[string]string
-// WorkerFactory creates workers for the application.
-type WorkerFactory interface {
+// Server creates workers for the application.
+type Server interface {
CmdFactory(env Env) (func() *exec.Cmd, error)
NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env Env) (roadrunner.Pool, error)
diff --git a/plugins/http/attributes/attributes_test.go b/plugins/http/attributes/attributes_test.go
index d914f6fa..a4c85eea 100644
--- a/plugins/http/attributes/attributes_test.go
+++ b/plugins/http/attributes/attributes_test.go
@@ -71,7 +71,6 @@ func TestSetAttribute(t *testing.T) {
func TestSetAttributeNone(t *testing.T) {
r := &http.Request{}
-
err := Set(r, "key", "value")
if err != nil {
t.Errorf("error during the Set: error %v", err)
diff --git a/plugins/http/config.go b/plugins/http/config.go
index 7922f485..b3f4ca13 100644
--- a/plugins/http/config.go
+++ b/plugins/http/config.go
@@ -19,7 +19,7 @@ type ServerConfig struct {
User string
// Relay defines connection method and factory to be used to connect to workers:
- // "pipes", "tcp://:6001", "unix://rr.sock"
+ // "pipes", "tcp://:6001", "unix://pool.sock"
// This config section must not change on re-configuration.
Relay string
@@ -111,7 +111,7 @@ func (c *Config) EnableHTTP() bool {
return c.Address != ""
}
-// EnableTLS returns true if rr must listen TLS connections.
+// EnableTLS returns true if pool must listen TLS connections.
func (c *Config) EnableTLS() bool {
return c.SSL.Key != "" || c.SSL.Cert != "" || c.SSL.RootCA != ""
}
diff --git a/plugins/http/handler_test.go b/plugins/http/handler_test.go
index d9726e81..d15cf96f 100644
--- a/plugins/http/handler_test.go
+++ b/plugins/http/handler_test.go
@@ -73,7 +73,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php echo pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -84,8 +84,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8177", Handler: h}
// defer func() {
@@ -118,7 +118,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php echo pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -145,7 +145,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php echo pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -174,7 +174,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php header pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -185,8 +185,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8078", Handler: h}
// defer func() {
@@ -237,7 +237,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php user-agent pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -248,8 +248,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8088", Handler: h}
// defer func() {
@@ -299,7 +299,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php user-agent pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -310,8 +310,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8088", Handler: h}
// defer func() {
@@ -361,7 +361,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php cookie pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -372,8 +372,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8079", Handler: h}
// defer func() {
@@ -428,7 +428,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php payload pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -439,8 +439,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8090", Handler: h}
// defer func() {
@@ -494,7 +494,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php payload pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -505,8 +505,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8081", Handler: h}
// defer func() {
@@ -555,7 +555,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php payload pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -566,8 +566,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8082", Handler: h}
// defer func() {
@@ -617,7 +617,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php data pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -628,8 +628,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8083", Handler: h}
// defer func() {
@@ -691,7 +691,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php data pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -702,8 +702,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8083", Handler: h}
// defer func() {
@@ -759,7 +759,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php data pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -770,8 +770,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8083", Handler: h}
// defer func() {
@@ -833,7 +833,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php data pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -844,8 +844,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8084", Handler: h}
// defer func() {
@@ -907,7 +907,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php data pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -918,8 +918,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8085", Handler: h}
// defer func() {
@@ -981,7 +981,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php data pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -992,8 +992,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8019", Handler: h}
// defer func() {
@@ -1097,7 +1097,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php data pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1108,8 +1108,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8020", Handler: h}
// defer func() {
@@ -1213,7 +1213,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php data pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1224,8 +1224,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8021", Handler: h}
// defer func() {
@@ -1331,7 +1331,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php error pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1342,8 +1342,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8177", Handler: h}
// defer func() {
@@ -1375,7 +1375,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php error2 pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1386,8 +1386,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8177", Handler: h}
// defer func() {
@@ -1419,7 +1419,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php pid pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1430,8 +1430,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8177", Handler: h}
// defer func() {
@@ -1480,7 +1480,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php echo pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1491,8 +1491,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8177", Handler: h}
// defer func() {
@@ -1539,7 +1539,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php echoDelay pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1550,8 +1550,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8177", Handler: h}
// defer func() {
@@ -1598,7 +1598,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php error pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1609,8 +1609,8 @@ package http
// }),
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8177", Handler: h}
// defer func() {
@@ -1665,7 +1665,7 @@ package http
// "fe80::/10",
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php ip pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1681,8 +1681,8 @@ package http
// t.Errorf("error parsing CIDRs: error %v", err)
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
// defer func() {
@@ -1724,7 +1724,7 @@ package http
// "fe80::/10",
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php ip pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1740,8 +1740,8 @@ package http
// t.Errorf("error parsing CIDRs: error %v", err)
// }
//
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
// defer func() {
@@ -1788,7 +1788,7 @@ package http
// "fe80::/10",
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php ip pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1803,8 +1803,8 @@ package http
// if err != nil {
// t.Errorf("error parsing CIDRs: error %v", err)
// }
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
// defer func() {
@@ -1851,7 +1851,7 @@ package http
// "10.0.0.0/8",
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php ip pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1866,8 +1866,8 @@ package http
// if err != nil {
// t.Errorf("error parsing CIDRs: error %v", err)
// }
-// assert.NoError(t, h.rr.Start())
-// defer h.rr.Stop()
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
// defer func() {
@@ -1903,7 +1903,7 @@ package http
// Forbid: []string{},
// },
// },
-// rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
// Command: "php ../../tests/http/client.php echo pipes",
// Relay: "pipes",
// Pool: &roadrunner.Config{
@@ -1914,11 +1914,11 @@ package http
// }),
// }
//
-// err := h.rr.Start()
+// err := h.pool.Start()
// if err != nil {
// b.Errorf("error starting the worker pool: error %v", err)
// }
-// defer h.rr.Stop()
+// defer h.pool.Stop()
//
// hs := &http.Server{Addr: ":8177", Handler: h}
// defer func() {
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 94b6c74b..581455b3 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -37,20 +37,20 @@ const (
// http middleware type.
type middleware func(f http.HandlerFunc) http.HandlerFunc
-// Service manages rr, http servers.
+// Service manages pool, http servers.
type Plugin struct {
sync.Mutex
sync.WaitGroup
- cfg *Config
- log log.Logger
+ cfg *Config
+ configurer config.Configurer
+ log log.Logger
- //cprod roadrunner.CommandProducer
- env map[string]string
- lsns []func(event int, ctx interface{})
- mdwr []middleware
+ mdwr []middleware
+ listeners []util.EventListener
- rr roadrunner.Pool
+ pool roadrunner.Pool
+ server factory.Server
//controller roadrunner.Controller
handler *Handler
@@ -59,35 +59,29 @@ type Plugin struct {
fcgi *http.Server
}
-//// Attach attaches controller. Currently only one controller is supported.
-//func (s *Service) Attach(w roadrunner.Controller) {
-// s.controller = w
-//}
-//
-//// ProduceCommands changes the default command generator method
-//func (s *Service) ProduceCommands(producer roadrunner.CommandProducer) {
-// s.cprod = producer
-//}
-
// AddMiddleware adds new net/http mdwr.
func (s *Plugin) AddMiddleware(m middleware) {
s.mdwr = append(s.mdwr, m)
}
// AddListener attaches server event controller.
-func (s *Plugin) AddListener(l func(event int, ctx interface{})) {
- s.lsns = append(s.lsns, l)
+func (s *Plugin) AddListener(listener util.EventListener) {
+ // save listeners for Reset
+ s.listeners = append(s.listeners, listener)
+ s.pool.AddListener(listener)
}
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.WorkerFactory) error {
+func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Server) error {
const op = errors.Op("http Init")
err := cfg.UnmarshalKey(ServiceName, &s.cfg)
if err != nil {
return errors.E(op, err)
}
+ s.configurer = cfg
+ s.listeners = make([]util.EventListener, 0, 1)
s.log = log
// Set needed env vars
@@ -107,7 +101,7 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Work
return errors.E(op, err)
}
- s.rr = p
+ s.pool = p
//if r != nil {
// if err := r.Register(ID, &rpcServer{s}); err != nil {
@@ -125,6 +119,8 @@ func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server factory.Work
// Serve serves the svc.
func (s *Plugin) Serve() chan error {
s.Lock()
+ defer s.Unlock()
+
const op = errors.Op("serve http")
errCh := make(chan error, 2)
@@ -137,14 +133,14 @@ func (s *Plugin) Serve() chan error {
//s.cfg.Workers.CommandProducer = s.cprod
//s.cfg.Workers.SetEnv("RR_HTTP", "true")
//
- //s.rr = roadrunner.NewServer(s.cfg.Workers)
- //s.rr.Listen(s.throw)
+ //s.pool = roadrunner.NewServer(s.cfg.Workers)
+ //s.pool.Listen(s.throw)
//
//if s.controller != nil {
- // s.rr.Attach(s.controller)
+ // s.pool.Attach(s.controller)
//}
- s.handler = &Handler{cfg: s.cfg, rr: s.rr}
+ s.handler = &Handler{cfg: s.cfg, rr: s.pool}
//s.handler.Listen(s.throw)
if s.cfg.EnableHTTP() {
@@ -177,12 +173,10 @@ func (s *Plugin) Serve() chan error {
s.fcgi = &http.Server{Handler: s}
}
- s.Unlock()
-
- //if err := s.rr.Start(); err != nil {
+ //if err := s.pool.Start(); err != nil {
// return err
//}
- //defer s.rr.Stop()
+ //defer s.pool.Stop()
if s.http != nil {
go func() {
@@ -260,7 +254,7 @@ func (s *Plugin) Stop() error {
return err
}
-// ServeHTTP handles connection using set of middleware and rr PSR-7 server.
+// ServeHTTP handles connection using set of middleware and pool PSR-7 server.
func (s *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect {
target := &url.URL{
@@ -385,7 +379,6 @@ func (s *Plugin) initSSL() *http.Server {
PreferServerCipherSuites: true,
},
}
- //s.throw(EventInitSSL, server)
return server
}
@@ -419,7 +412,7 @@ func (s *Plugin) serveFCGI() error {
// }
//
// if event == roadrunner.EventServerFailure {
-// // underlying rr server is dead
+// // underlying pool server is dead
// s.Stop()
// }
//}
@@ -436,65 +429,42 @@ func (s *Plugin) tlsAddr(host string, forcePort bool) string {
return host
}
-// Server returns associated rr workers
+// Server returns associated pool workers
func (s *Plugin) Workers() []roadrunner.WorkerBase {
- return s.rr.Workers()
+ return s.pool.Workers()
}
func (s *Plugin) Reset() error {
+ s.Lock()
+ defer s.Unlock()
+ s.pool.Destroy(context.Background())
+
+ // Set needed env vars
+ env := make(map[string]string)
+ env["RR_HTTP"] = "true"
+ var err error
+
// re-read the config
- // destroy the pool
- // attach new one
+ err = s.configurer.UnmarshalKey(ServiceName, &s.cfg)
+ if err != nil {
+ return err
+ }
- //s.mup.Lock()
- //defer s.mup.Unlock()
- //
- //s.mu.Lock()
- //if !s.started {
- // s.cfg = cfg
- // s.mu.Unlock()
- // return nil
- //}
- //s.mu.Unlock()
- //
- //if s.cfg.Differs(cfg) {
- // return errors.New("unable to reconfigure server (cmd and pool changes are allowed)")
- //}
- //
- //s.mu.Lock()
- //previous := s.pool
- //pWatcher := s.pController
- //s.mu.Unlock()
- //
- //pool, err := NewPool(cfg.makeCommand(), s.factory, *cfg.Pool)
- //if err != nil {
- // return err
- //}
- //
- //pool.Listen(s.poolListener)
- //
- //s.mu.Lock()
- //s.cfg.Pool, s.pool = cfg.Pool, pool
- //
- //if s.controller != nil {
- // s.pController = s.controller.Attach(pool)
- //}
- //
- //s.mu.Unlock()
- //
- //s.throw(EventPoolConstruct, pool)
- //
- //if previous != nil {
- // go func(previous Pool, pWatcher Controller) {
- // s.throw(EventPoolDestruct, previous)
- // if pWatcher != nil {
- // pWatcher.Detach()
- // }
- //
- // previous.Destroy()
- // }(previous, pWatcher)
- //}
- //
- //return nil
+ s.pool, err = s.server.NewWorkerPool(context.Background(), roadrunner.PoolConfig{
+ Debug: false,
+ NumWorkers: 0,
+ MaxJobs: 0,
+ AllocateTimeout: 0,
+ DestroyTimeout: 0,
+ Supervisor: nil,
+ }, env)
+ if err != nil {
+ return err
+ }
+
+ // restore original listeners
+ for i := 0; i < len(s.listeners); i++ {
+ s.pool.AddListener(s.listeners[i])
+ }
return nil
}
diff --git a/plugins/http/plugin_test.go b/plugins/http/plugin_test.go
index f7ee33cc..012cea04 100644
--- a/plugins/http/plugin_test.go
+++ b/plugins/http/plugin_test.go
@@ -1,759 +1,759 @@
package http
-import (
- "github.com/cenkalti/backoff/v4"
- json "github.com/json-iterator/go"
- "github.com/sirupsen/logrus"
- "github.com/sirupsen/logrus/hooks/test"
- "github.com/spiral/roadrunner"
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/env"
- "github.com/spiral/roadrunner/service/rpc"
- "github.com/stretchr/testify/assert"
- "io/ioutil"
- "net/http"
- "os"
- "testing"
- "time"
-)
-
-type testCfg struct {
- httpCfg string
- rpcCfg string
- envCfg string
- target string
-}
-
-func (cfg *testCfg) Get(name string) service.Config {
- if name == ID {
- if cfg.httpCfg == "" {
- return nil
- }
-
- return &testCfg{target: cfg.httpCfg}
- }
-
- if name == rpc.ID {
- return &testCfg{target: cfg.rpcCfg}
- }
-
- if name == env.ID {
- return &testCfg{target: cfg.envCfg}
- }
-
- return nil
-}
-func (cfg *testCfg) Unmarshal(out interface{}) error {
- j := json.ConfigCompatibleWithStandardLibrary
- return j.Unmarshal([]byte(cfg.target), out)
-}
-
-func Test_Service_NoConfig(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{httpCfg: `{"Enable":true}`})
- assert.Error(t, err)
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusInactive, st)
-}
-
-func Test_Service_Configure_Disable(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{}))
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusInactive, st)
-}
-
-func Test_Service_Configure_Enable(t *testing.T) {
- bkoff := backoff.NewExponentialBackOff()
- bkoff.MaxElapsedTime = time.Second * 15
-
- err := backoff.Retry(func() error {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{httpCfg: `{
- "enable": true,
- "address": ":8070",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php echo pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`})
- if err != nil {
- return err
- }
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusOK, st)
-
- return nil
- }, bkoff)
-
- if err != nil {
- t.Fatal(err)
- }
-
-}
-
-func Test_Service_Echo(t *testing.T) {
- bkoff := backoff.NewExponentialBackOff()
- bkoff.MaxElapsedTime = time.Second * 15
-
- err := backoff.Retry(func() error {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{httpCfg: `{
- "enable": true,
- "address": ":6536",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php echo pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`})
- if err != nil {
- return err
- }
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusOK, st)
-
- // should do nothing
- s.(*Service).Stop()
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("serve error: %v", err)
- }
- }()
-
- time.Sleep(time.Millisecond * 100)
-
- req, err := http.NewRequest("GET", "http://localhost:6536?hello=world", nil)
- if err != nil {
- c.Stop()
- return err
- }
-
- r, err := http.DefaultClient.Do(req)
- if err != nil {
- c.Stop()
- return err
- }
- b, err := ioutil.ReadAll(r.Body)
- if err != nil {
- c.Stop()
- return err
- }
- assert.Equal(t, 201, r.StatusCode)
- assert.Equal(t, "WORLD", string(b))
-
- err = r.Body.Close()
- if err != nil {
- c.Stop()
- return err
- }
-
- c.Stop()
- return nil
- }, bkoff)
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func Test_Service_Env(t *testing.T) {
- bkoff := backoff.NewExponentialBackOff()
- bkoff.MaxElapsedTime = time.Second * 15
-
- err := backoff.Retry(func() error {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(env.ID, env.NewService(map[string]string{"rr": "test"}))
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{httpCfg: `{
- "enable": true,
- "address": ":10031",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php env pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`, envCfg: `{"env_key":"ENV_VALUE"}`})
- if err != nil {
- return err
- }
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusOK, st)
-
- // should do nothing
- s.(*Service).Stop()
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("serve error: %v", err)
- }
- }()
-
- time.Sleep(time.Millisecond * 500)
-
- req, err := http.NewRequest("GET", "http://localhost:10031", nil)
- if err != nil {
- c.Stop()
- return err
- }
-
- r, err := http.DefaultClient.Do(req)
- if err != nil {
- c.Stop()
- return err
- }
-
- b, err := ioutil.ReadAll(r.Body)
- if err != nil {
- c.Stop()
- return err
- }
-
- assert.Equal(t, 200, r.StatusCode)
- assert.Equal(t, "ENV_VALUE", string(b))
-
- err = r.Body.Close()
- if err != nil {
- c.Stop()
- return err
- }
-
- c.Stop()
- return nil
- }, bkoff)
-
- if err != nil {
- t.Fatal(err)
- }
-
-}
-
-func Test_Service_ErrorEcho(t *testing.T) {
- bkoff := backoff.NewExponentialBackOff()
- bkoff.MaxElapsedTime = time.Second * 15
-
- err := backoff.Retry(func() error {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{httpCfg: `{
- "enable": true,
- "address": ":6030",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php echoerr pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`})
- if err != nil {
- return err
- }
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusOK, st)
-
- goterr := make(chan interface{})
- s.(*Service).AddListener(func(event int, ctx interface{}) {
- if event == roadrunner.EventStderrOutput {
- if string(ctx.([]byte)) == "WORLD\n" {
- goterr <- nil
- }
- }
- })
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("serve error: %v", err)
- }
- }()
-
- time.Sleep(time.Millisecond * 500)
-
- req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil)
- if err != nil {
- c.Stop()
- return err
- }
-
- r, err := http.DefaultClient.Do(req)
- if err != nil {
- c.Stop()
- return err
- }
-
- b, err := ioutil.ReadAll(r.Body)
- if err != nil {
- c.Stop()
- return err
- }
-
- <-goterr
-
- assert.Equal(t, 201, r.StatusCode)
- assert.Equal(t, "WORLD", string(b))
- err = r.Body.Close()
- if err != nil {
- c.Stop()
- return err
- }
-
- c.Stop()
-
- return nil
- }, bkoff)
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func Test_Service_Middleware(t *testing.T) {
- bkoff := backoff.NewExponentialBackOff()
- bkoff.MaxElapsedTime = time.Second * 15
-
- err := backoff.Retry(func() error {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{httpCfg: `{
- "enable": true,
- "address": ":6032",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php echo pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`})
- if err != nil {
- return err
- }
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusOK, st)
-
- s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- if r.URL.Path == "/halt" {
- w.WriteHeader(500)
- _, err := w.Write([]byte("halted"))
- if err != nil {
- t.Errorf("error writing the data to the http reply: error %v", err)
- }
- } else {
- f(w, r)
- }
- }
- })
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("serve error: %v", err)
- }
- }()
- time.Sleep(time.Millisecond * 500)
-
- req, err := http.NewRequest("GET", "http://localhost:6032?hello=world", nil)
- if err != nil {
- c.Stop()
- return err
- }
-
- r, err := http.DefaultClient.Do(req)
- if err != nil {
- c.Stop()
- return err
- }
-
- b, err := ioutil.ReadAll(r.Body)
- if err != nil {
- c.Stop()
- return err
- }
-
- assert.Equal(t, 201, r.StatusCode)
- assert.Equal(t, "WORLD", string(b))
-
- err = r.Body.Close()
- if err != nil {
- c.Stop()
- return err
- }
-
- req, err = http.NewRequest("GET", "http://localhost:6032/halt", nil)
- if err != nil {
- c.Stop()
- return err
- }
-
- r, err = http.DefaultClient.Do(req)
- if err != nil {
- c.Stop()
- return err
- }
- b, err = ioutil.ReadAll(r.Body)
- if err != nil {
- c.Stop()
- return err
- }
-
- assert.Equal(t, 500, r.StatusCode)
- assert.Equal(t, "halted", string(b))
-
- err = r.Body.Close()
- if err != nil {
- c.Stop()
- return err
- }
- c.Stop()
-
- return nil
- }, bkoff)
-
- if err != nil {
- t.Fatal(err)
- }
-
-}
-
-func Test_Service_Listener(t *testing.T) {
- bkoff := backoff.NewExponentialBackOff()
- bkoff.MaxElapsedTime = time.Second * 15
-
- err := backoff.Retry(func() error {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{httpCfg: `{
- "enable": true,
- "address": ":6033",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php echo pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`})
- if err != nil {
- return err
- }
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusOK, st)
-
- stop := make(chan interface{})
- s.(*Service).AddListener(func(event int, ctx interface{}) {
- if event == roadrunner.EventServerStart {
- stop <- nil
- }
- })
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("serve error: %v", err)
- }
- }()
- time.Sleep(time.Millisecond * 500)
-
- c.Stop()
- assert.True(t, true)
-
- return nil
- }, bkoff)
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func Test_Service_Error(t *testing.T) {
- bkoff := backoff.NewExponentialBackOff()
- bkoff.MaxElapsedTime = time.Second * 15
-
- err := backoff.Retry(func() error {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{httpCfg: `{
- "enable": true,
- "address": ":6034",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php echo pipes",
- "relay": "---",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`})
- if err != nil {
- return err
- }
-
- // assert error
- err = c.Serve()
- if err == nil {
- return err
- }
-
- return nil
- }, bkoff)
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func Test_Service_Error2(t *testing.T) {
- bkoff := backoff.NewExponentialBackOff()
- bkoff.MaxElapsedTime = time.Second * 15
-
- err := backoff.Retry(func() error {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{httpCfg: `{
- "enable": true,
- "address": ":6035",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php broken pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`})
- if err != nil {
- return err
- }
-
- // assert error
- err = c.Serve()
- if err == nil {
- return err
- }
-
- return nil
- }, bkoff)
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func Test_Service_Error3(t *testing.T) {
- bkoff := backoff.NewExponentialBackOff()
- bkoff.MaxElapsedTime = time.Second * 15
-
- err := backoff.Retry(func() error {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{httpCfg: `{
- "enable": true,
- "address": ":6036",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers"
- "command": "php ../../tests/http/client.php broken pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`})
- // assert error
- if err == nil {
- return err
- }
-
- return nil
- }, bkoff)
-
- if err != nil {
- t.Fatal(err)
- }
-
-}
-
-func Test_Service_Error4(t *testing.T) {
- bkoff := backoff.NewExponentialBackOff()
- bkoff.MaxElapsedTime = time.Second * 15
-
- err := backoff.Retry(func() error {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- err := c.Init(&testCfg{httpCfg: `{
- "enable": true,
- "address": "----",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php broken pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`})
- // assert error
- if err != nil {
- return nil
- }
-
- return err
- }, bkoff)
-
- if err != nil {
- t.Fatal(err)
- }
-}
-
-func tmpDir() string {
- p := os.TempDir()
- j := json.ConfigCompatibleWithStandardLibrary
- r, _ := j.Marshal(p)
-
- return string(r)
-}
+//import (
+// "github.com/cenkalti/backoff/v4"
+// json "github.com/json-iterator/go"
+// "github.com/sirupsen/logrus"
+// "github.com/sirupsen/logrus/hooks/test"
+// "github.com/spiral/roadrunner"
+// "github.com/spiral/roadrunner/service"
+// "github.com/spiral/roadrunner/service/env"
+// "github.com/spiral/roadrunner/service/rpc"
+// "github.com/stretchr/testify/assert"
+// "io/ioutil"
+// "net/http"
+// "os"
+// "testing"
+// "time"
+//)
+//
+//type testCfg struct {
+// httpCfg string
+// rpcCfg string
+// envCfg string
+// target string
+//}
+//
+//func (cfg *testCfg) Get(name string) service.Config {
+// if name == ID {
+// if cfg.httpCfg == "" {
+// return nil
+// }
+//
+// return &testCfg{target: cfg.httpCfg}
+// }
+//
+// if name == rpc.ID {
+// return &testCfg{target: cfg.rpcCfg}
+// }
+//
+// if name == env.ID {
+// return &testCfg{target: cfg.envCfg}
+// }
+//
+// return nil
+//}
+//func (cfg *testCfg) Unmarshal(out interface{}) error {
+// j := json.ConfigCompatibleWithStandardLibrary
+// return j.Unmarshal([]byte(cfg.target), out)
+//}
+//
+//func Test_Service_NoConfig(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{"Enable":true}`})
+// assert.Error(t, err)
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusInactive, st)
+//}
+//
+//func Test_Service_Configure_Disable(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusInactive, st)
+//}
+//
+//func Test_Service_Configure_Enable(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":8070",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//
+//}
+//
+//func Test_Service_Echo(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6536",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("serve error: %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 100)
+//
+// req, err := http.NewRequest("GET", "http://localhost:6536?hello=world", nil)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// r, err := http.DefaultClient.Do(req)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+// b, err := ioutil.ReadAll(r.Body)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+// err = r.Body.Close()
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// c.Stop()
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func Test_Service_Env(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(env.ID, env.NewService(map[string]string{"pool": "test"}))
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":10031",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php env pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`, envCfg: `{"env_key":"ENV_VALUE"}`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("serve error: %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "http://localhost:10031", nil)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// r, err := http.DefaultClient.Do(req)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// b, err := ioutil.ReadAll(r.Body)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// assert.Equal(t, 200, r.StatusCode)
+// assert.Equal(t, "ENV_VALUE", string(b))
+//
+// err = r.Body.Close()
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// c.Stop()
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//
+//}
+//
+//func Test_Service_ErrorEcho(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6030",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echoerr pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// goterr := make(chan interface{})
+// s.(*Service).AddListener(func(event int, ctx interface{}) {
+// if event == roadrunner.EventStderrOutput {
+// if string(ctx.([]byte)) == "WORLD\n" {
+// goterr <- nil
+// }
+// }
+// })
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("serve error: %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// r, err := http.DefaultClient.Do(req)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// b, err := ioutil.ReadAll(r.Body)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// <-goterr
+//
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+// err = r.Body.Close()
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// c.Stop()
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func Test_Service_Middleware(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6032",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc {
+// return func(w http.ResponseWriter, r *http.Request) {
+// if r.URL.Path == "/halt" {
+// w.WriteHeader(500)
+// _, err := w.Write([]byte("halted"))
+// if err != nil {
+// t.Errorf("error writing the data to the http reply: error %v", err)
+// }
+// } else {
+// f(w, r)
+// }
+// }
+// })
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("serve error: %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "http://localhost:6032?hello=world", nil)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// r, err := http.DefaultClient.Do(req)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// b, err := ioutil.ReadAll(r.Body)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+// err = r.Body.Close()
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// req, err = http.NewRequest("GET", "http://localhost:6032/halt", nil)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// r, err = http.DefaultClient.Do(req)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+// b, err = ioutil.ReadAll(r.Body)
+// if err != nil {
+// c.Stop()
+// return err
+// }
+//
+// assert.Equal(t, 500, r.StatusCode)
+// assert.Equal(t, "halted", string(b))
+//
+// err = r.Body.Close()
+// if err != nil {
+// c.Stop()
+// return err
+// }
+// c.Stop()
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//
+//}
+//
+//func Test_Service_Listener(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6033",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// stop := make(chan interface{})
+// s.(*Service).AddListener(func(event int, ctx interface{}) {
+// if event == roadrunner.EventServerStart {
+// stop <- nil
+// }
+// })
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("serve error: %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// c.Stop()
+// assert.True(t, true)
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func Test_Service_Error(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6034",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "relay": "---",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// // assert error
+// err = c.Serve()
+// if err == nil {
+// return err
+// }
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func Test_Service_Error2(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6035",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php broken pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// if err != nil {
+// return err
+// }
+//
+// // assert error
+// err = c.Serve()
+// if err == nil {
+// return err
+// }
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func Test_Service_Error3(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": ":6036",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers"
+// "command": "php ../../tests/http/client.php broken pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// // assert error
+// if err == nil {
+// return err
+// }
+//
+// return nil
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//
+//}
+//
+//func Test_Service_Error4(t *testing.T) {
+// bkoff := backoff.NewExponentialBackOff()
+// bkoff.MaxElapsedTime = time.Second * 15
+//
+// err := backoff.Retry(func() error {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// err := c.Init(&testCfg{httpCfg: `{
+// "enable": true,
+// "address": "----",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php broken pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`})
+// // assert error
+// if err != nil {
+// return nil
+// }
+//
+// return err
+// }, bkoff)
+//
+// if err != nil {
+// t.Fatal(err)
+// }
+//}
+//
+//func tmpDir() string {
+// p := os.TempDir()
+// j := json.ConfigCompatibleWithStandardLibrary
+// r, _ := j.Marshal(p)
+//
+// return string(r)
+//}
diff --git a/plugins/http/response.go b/plugins/http/response.go
index c3de434f..9ebc0632 100644
--- a/plugins/http/response.go
+++ b/plugins/http/response.go
@@ -21,7 +21,7 @@ type Response struct {
body interface{}
}
-// NewResponse creates new response based on given rr payload.
+// NewResponse creates new response based on given pool payload.
func NewResponse(p roadrunner.Payload) (*Response, error) {
r := &Response{body: p.Body}
j := json.ConfigCompatibleWithStandardLibrary
diff --git a/plugins/http/response_test.go b/plugins/http/response_test.go
index 1f394276..b5adbad9 100644
--- a/plugins/http/response_test.go
+++ b/plugins/http/response_test.go
@@ -6,7 +6,7 @@ import (
"net/http"
"testing"
- "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/v2"
"github.com/stretchr/testify/assert"
)
@@ -44,13 +44,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error {
}
func TestNewResponse_Error(t *testing.T) {
- r, err := NewResponse(&roadrunner.Payload{Context: []byte(`invalid payload`)})
+ r, err := NewResponse(roadrunner.Payload{Context: []byte(`invalid payload`)})
assert.Error(t, err)
assert.Nil(t, r)
}
func TestNewResponse_Write(t *testing.T) {
- r, err := NewResponse(&roadrunner.Payload{
+ r, err := NewResponse(roadrunner.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
Body: []byte(`sample body`),
})
@@ -67,7 +67,7 @@ func TestNewResponse_Write(t *testing.T) {
}
func TestNewResponse_Stream(t *testing.T) {
- r, err := NewResponse(&roadrunner.Payload{
+ r, err := NewResponse(roadrunner.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -91,7 +91,7 @@ func TestNewResponse_Stream(t *testing.T) {
}
func TestNewResponse_StreamError(t *testing.T) {
- r, err := NewResponse(&roadrunner.Payload{
+ r, err := NewResponse(roadrunner.Payload{
Context: []byte(`{"headers":{"key":["value"]},"status": 301}`),
})
@@ -111,7 +111,7 @@ func TestNewResponse_StreamError(t *testing.T) {
}
func TestWrite_HandlesPush(t *testing.T) {
- r, err := NewResponse(&roadrunner.Payload{
+ r, err := NewResponse(roadrunner.Payload{
Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`),
})
@@ -126,7 +126,7 @@ func TestWrite_HandlesPush(t *testing.T) {
}
func TestWrite_HandlesTrailers(t *testing.T) {
- r, err := NewResponse(&roadrunner.Payload{
+ r, err := NewResponse(roadrunner.Payload{
Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`),
})
@@ -145,7 +145,7 @@ func TestWrite_HandlesTrailers(t *testing.T) {
}
func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) {
- r, err := NewResponse(&roadrunner.Payload{
+ r, err := NewResponse(roadrunner.Payload{
Context: []byte(
`{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`),
})
diff --git a/plugins/http/rpc_test.go b/plugins/http/rpc_test.go
index e57a8699..86499d46 100644
--- a/plugins/http/rpc_test.go
+++ b/plugins/http/rpc_test.go
@@ -1,221 +1,222 @@
package http
-import (
- json "github.com/json-iterator/go"
- "github.com/sirupsen/logrus"
- "github.com/sirupsen/logrus/hooks/test"
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/rpc"
- "github.com/stretchr/testify/assert"
- "os"
- "strconv"
- "testing"
- "time"
-)
-
-func Test_RPC(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`,
- httpCfg: `{
- "enable": true,
- "address": ":16031",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php pid pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`}))
-
- s, _ := c.Get(ID)
- ss := s.(*Service)
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("error during the Serve: error %v", err)
- }
- }()
-
- time.Sleep(time.Second)
-
- res, _, err := get("http://localhost:16031")
- if err != nil {
- t.Fatal(err)
- }
- assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- r := ""
- assert.NoError(t, cl.Call("http.Reset", true, &r))
- assert.Equal(t, "OK", r)
-
- res2, _, err := get("http://localhost:16031")
- if err != nil {
- t.Fatal(err)
- }
- assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2)
- assert.NotEqual(t, res, res2)
- c.Stop()
-}
-
-func Test_RPC_Unix(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(ID, &Service{})
-
- sock := `unix://` + os.TempDir() + `/rpc.unix`
- j := json.ConfigCompatibleWithStandardLibrary
- data, _ := j.Marshal(sock)
-
- assert.NoError(t, c.Init(&testCfg{
- rpcCfg: `{"enable":true, "listen":` + string(data) + `}`,
- httpCfg: `{
- "enable": true,
- "address": ":6032",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php pid pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`}))
-
- s, _ := c.Get(ID)
- ss := s.(*Service)
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("error during the Serve: error %v", err)
- }
- }()
-
- time.Sleep(time.Millisecond * 500)
-
- res, _, err := get("http://localhost:6032")
- if err != nil {
- c.Stop()
- t.Fatal(err)
- }
- if ss.rr.Workers() != nil && len(ss.rr.Workers()) > 0 {
- assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res)
- } else {
- c.Stop()
- t.Fatal("no workers initialized")
- }
-
- cl, err := rs.Client()
- if err != nil {
- c.Stop()
- t.Fatal(err)
- }
-
- r := ""
- assert.NoError(t, cl.Call("http.Reset", true, &r))
- assert.Equal(t, "OK", r)
-
- res2, _, err := get("http://localhost:6032")
- if err != nil {
- c.Stop()
- t.Fatal(err)
- }
- assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2)
- assert.NotEqual(t, res, res2)
- c.Stop()
-}
-
-func Test_Workers(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(rpc.ID, &rpc.Service{})
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{
- rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`,
- httpCfg: `{
- "enable": true,
- "address": ":6033",
- "maxRequestSize": 1024,
- "uploads": {
- "dir": ` + tmpDir() + `,
- "forbid": []
- },
- "workers":{
- "command": "php ../../tests/http/client.php pid pipes",
- "relay": "pipes",
- "pool": {
- "numWorkers": 1,
- "allocateTimeout": 10000000,
- "destroyTimeout": 10000000
- }
- }
- }`}))
-
- s, _ := c.Get(ID)
- ss := s.(*Service)
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("error during the Serve: error %v", err)
- }
- }()
- time.Sleep(time.Millisecond * 500)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- r := &WorkerList{}
- assert.NoError(t, cl.Call("http.Workers", true, &r))
- assert.Len(t, r.Workers, 1)
-
- assert.Equal(t, *ss.rr.Workers()[0].Pid, r.Workers[0].Pid)
- c.Stop()
-}
-
-func Test_Errors(t *testing.T) {
- r := &rpcServer{nil}
-
- assert.Error(t, r.Reset(true, nil))
- assert.Error(t, r.Workers(true, nil))
-}
+//
+//import (
+// json "github.com/json-iterator/go"
+// "github.com/sirupsen/logrus"
+// "github.com/sirupsen/logrus/hooks/test"
+// "github.com/spiral/roadrunner/service"
+// "github.com/spiral/roadrunner/service/rpc"
+// "github.com/stretchr/testify/assert"
+// "os"
+// "strconv"
+// "testing"
+// "time"
+//)
+//
+//func Test_RPC(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(rpc.ID, &rpc.Service{})
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{
+// rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`,
+// httpCfg: `{
+// "enable": true,
+// "address": ":16031",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php pid pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`}))
+//
+// s, _ := c.Get(ID)
+// ss := s.(*Service)
+//
+// s2, _ := c.Get(rpc.ID)
+// rs := s2.(*rpc.Service)
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Second)
+//
+// res, _, err := get("http://localhost:16031")
+// if err != nil {
+// t.Fatal(err)
+// }
+// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res)
+//
+// cl, err := rs.Client()
+// assert.NoError(t, err)
+//
+// r := ""
+// assert.NoError(t, cl.Call("http.Reset", true, &r))
+// assert.Equal(t, "OK", r)
+//
+// res2, _, err := get("http://localhost:16031")
+// if err != nil {
+// t.Fatal(err)
+// }
+// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res2)
+// assert.NotEqual(t, res, res2)
+// c.Stop()
+//}
+//
+//func Test_RPC_Unix(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(rpc.ID, &rpc.Service{})
+// c.Register(ID, &Service{})
+//
+// sock := `unix://` + os.TempDir() + `/rpc.unix`
+// j := json.ConfigCompatibleWithStandardLibrary
+// data, _ := j.Marshal(sock)
+//
+// assert.NoError(t, c.Init(&testCfg{
+// rpcCfg: `{"enable":true, "listen":` + string(data) + `}`,
+// httpCfg: `{
+// "enable": true,
+// "address": ":6032",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php pid pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`}))
+//
+// s, _ := c.Get(ID)
+// ss := s.(*Service)
+//
+// s2, _ := c.Get(rpc.ID)
+// rs := s2.(*rpc.Service)
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 500)
+//
+// res, _, err := get("http://localhost:6032")
+// if err != nil {
+// c.Stop()
+// t.Fatal(err)
+// }
+// if ss.pool.Workers() != nil && len(ss.pool.Workers()) > 0 {
+// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res)
+// } else {
+// c.Stop()
+// t.Fatal("no workers initialized")
+// }
+//
+// cl, err := rs.Client()
+// if err != nil {
+// c.Stop()
+// t.Fatal(err)
+// }
+//
+// r := ""
+// assert.NoError(t, cl.Call("http.Reset", true, &r))
+// assert.Equal(t, "OK", r)
+//
+// res2, _, err := get("http://localhost:6032")
+// if err != nil {
+// c.Stop()
+// t.Fatal(err)
+// }
+// assert.Equal(t, strconv.Itoa(*ss.pool.Workers()[0].Pid), res2)
+// assert.NotEqual(t, res, res2)
+// c.Stop()
+//}
+//
+//func Test_Workers(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(rpc.ID, &rpc.Service{})
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{
+// rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`,
+// httpCfg: `{
+// "enable": true,
+// "address": ":6033",
+// "maxRequestSize": 1024,
+// "uploads": {
+// "dir": ` + tmpDir() + `,
+// "forbid": []
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php pid pipes",
+// "relay": "pipes",
+// "pool": {
+// "numWorkers": 1,
+// "allocateTimeout": 10000000,
+// "destroyTimeout": 10000000
+// }
+// }
+// }`}))
+//
+// s, _ := c.Get(ID)
+// ss := s.(*Service)
+//
+// s2, _ := c.Get(rpc.ID)
+// rs := s2.(*rpc.Service)
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// cl, err := rs.Client()
+// assert.NoError(t, err)
+//
+// r := &WorkerList{}
+// assert.NoError(t, cl.Call("http.Workers", true, &r))
+// assert.Len(t, r.Workers, 1)
+//
+// assert.Equal(t, *ss.pool.Workers()[0].Pid, r.Workers[0].Pid)
+// c.Stop()
+//}
+//
+//func Test_Errors(t *testing.T) {
+// r := &rpcServer{nil}
+//
+// assert.Error(t, r.Reset(true, nil))
+// assert.Error(t, r.Workers(true, nil))
+//}
diff --git a/plugins/http/ssl_test.go b/plugins/http/ssl_test.go
index cf147be9..df09aef5 100644
--- a/plugins/http/ssl_test.go
+++ b/plugins/http/ssl_test.go
@@ -1,254 +1,255 @@
package http
-import (
- "crypto/tls"
- "github.com/sirupsen/logrus"
- "github.com/sirupsen/logrus/hooks/test"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "io/ioutil"
- "net/http"
- "testing"
- "time"
-)
-
-var sslClient = &http.Client{
- Transport: &http.Transport{
- TLSClientConfig: &tls.Config{
- InsecureSkipVerify: true,
- },
- },
-}
-
-func Test_SSL_Service_Echo(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{httpCfg: `{
- "address": ":6029",
- "ssl": {
- "port": 6900,
- "key": "fixtures/server.key",
- "cert": "fixtures/server.crt"
- },
- "workers":{
- "command": "php ../../tests/http/client.php echo pipes",
- "pool": {"numWorkers": 1}
- }
- }`}))
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusOK, st)
-
- // should do nothing
- s.(*Service).Stop()
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("error during the Serve: error %v", err)
- }
- }()
- time.Sleep(time.Millisecond * 500)
-
- req, err := http.NewRequest("GET", "https://localhost:6900?hello=world", nil)
- assert.NoError(t, err)
-
- r, err := sslClient.Do(req)
- assert.NoError(t, err)
-
- b, err := ioutil.ReadAll(r.Body)
- assert.NoError(t, err)
-
- assert.NoError(t, err)
- assert.Equal(t, 201, r.StatusCode)
- assert.Equal(t, "WORLD", string(b))
-
- err2 := r.Body.Close()
- if err2 != nil {
- t.Errorf("fail to close the Body: error %v", err2)
- }
-
- c.Stop()
-}
-
-func Test_SSL_Service_NoRedirect(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{httpCfg: `{
- "address": ":6030",
- "ssl": {
- "port": 6901,
- "key": "fixtures/server.key",
- "cert": "fixtures/server.crt"
- },
- "workers":{
- "command": "php ../../tests/http/client.php echo pipes",
- "pool": {"numWorkers": 1}
- }
- }`}))
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusOK, st)
-
- // should do nothing
- s.(*Service).Stop()
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("error during the Serve: error %v", err)
- }
- }()
-
- time.Sleep(time.Millisecond * 500)
-
- req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil)
- assert.NoError(t, err)
-
- r, err := sslClient.Do(req)
- assert.NoError(t, err)
-
- assert.Nil(t, r.TLS)
-
- b, err := ioutil.ReadAll(r.Body)
- assert.NoError(t, err)
-
- assert.NoError(t, err)
- assert.Equal(t, 201, r.StatusCode)
- assert.Equal(t, "WORLD", string(b))
-
- err2 := r.Body.Close()
- if err2 != nil {
- t.Errorf("fail to close the Body: error %v", err2)
- }
- c.Stop()
-}
-
-func Test_SSL_Service_Redirect(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{httpCfg: `{
- "address": ":6831",
- "ssl": {
- "port": 6902,
- "redirect": true,
- "key": "fixtures/server.key",
- "cert": "fixtures/server.crt"
- },
- "workers":{
- "command": "php ../../tests/http/client.php echo pipes",
- "pool": {"numWorkers": 1}
- }
- }`}))
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusOK, st)
-
- // should do nothing
- s.(*Service).Stop()
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("error during the Serve: error %v", err)
- }
- }()
-
- time.Sleep(time.Millisecond * 500)
-
- req, err := http.NewRequest("GET", "http://localhost:6831?hello=world", nil)
- assert.NoError(t, err)
-
- r, err := sslClient.Do(req)
- assert.NoError(t, err)
- assert.NotNil(t, r.TLS)
-
- b, err := ioutil.ReadAll(r.Body)
- assert.NoError(t, err)
-
- assert.NoError(t, err)
- assert.Equal(t, 201, r.StatusCode)
- assert.Equal(t, "WORLD", string(b))
-
- err2 := r.Body.Close()
- if err2 != nil {
- t.Errorf("fail to close the Body: error %v", err2)
- }
- c.Stop()
-}
-
-func Test_SSL_Service_Push(t *testing.T) {
- logger, _ := test.NewNullLogger()
- logger.SetLevel(logrus.DebugLevel)
-
- c := service.NewContainer(logger)
- c.Register(ID, &Service{})
-
- assert.NoError(t, c.Init(&testCfg{httpCfg: `{
- "address": ":6032",
- "ssl": {
- "port": 6903,
- "redirect": true,
- "key": "fixtures/server.key",
- "cert": "fixtures/server.crt"
- },
- "workers":{
- "command": "php ../../tests/http/client.php push pipes",
- "pool": {"numWorkers": 1}
- }
- }`}))
-
- s, st := c.Get(ID)
- assert.NotNil(t, s)
- assert.Equal(t, service.StatusOK, st)
-
- // should do nothing
- s.(*Service).Stop()
-
- go func() {
- err := c.Serve()
- if err != nil {
- t.Errorf("error during the Serve: error %v", err)
- }
- }()
- time.Sleep(time.Millisecond * 500)
-
- req, err := http.NewRequest("GET", "https://localhost:6903?hello=world", nil)
- assert.NoError(t, err)
-
- r, err := sslClient.Do(req)
- assert.NoError(t, err)
-
- assert.NotNil(t, r.TLS)
-
- b, err := ioutil.ReadAll(r.Body)
- assert.NoError(t, err)
-
- assert.Equal(t, "", r.Header.Get("Http2-Push"))
-
- assert.NoError(t, err)
- assert.Equal(t, 201, r.StatusCode)
- assert.Equal(t, "WORLD", string(b))
-
-
- err2 := r.Body.Close()
- if err2 != nil {
- t.Errorf("fail to close the Body: error %v", err2)
- }
- c.Stop()
-}
+//
+//import (
+// "crypto/tls"
+// "github.com/sirupsen/logrus"
+// "github.com/sirupsen/logrus/hooks/test"
+// "github.com/spiral/roadrunner/service"
+// "github.com/stretchr/testify/assert"
+// "io/ioutil"
+// "net/http"
+// "testing"
+// "time"
+//)
+//
+//var sslClient = &http.Client{
+// Transport: &http.Transport{
+// TLSClientConfig: &tls.Config{
+// InsecureSkipVerify: true,
+// },
+// },
+//}
+//
+//func Test_SSL_Service_Echo(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+// "address": ":6029",
+// "ssl": {
+// "port": 6900,
+// "key": "fixtures/server.key",
+// "cert": "fixtures/server.crt"
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "pool": {"numWorkers": 1}
+// }
+// }`}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "https://localhost:6900?hello=world", nil)
+// assert.NoError(t, err)
+//
+// r, err := sslClient.Do(req)
+// assert.NoError(t, err)
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+// err2 := r.Body.Close()
+// if err2 != nil {
+// t.Errorf("fail to close the Body: error %v", err2)
+// }
+//
+// c.Stop()
+//}
+//
+//func Test_SSL_Service_NoRedirect(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+// "address": ":6030",
+// "ssl": {
+// "port": 6901,
+// "key": "fixtures/server.key",
+// "cert": "fixtures/server.crt"
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "pool": {"numWorkers": 1}
+// }
+// }`}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil)
+// assert.NoError(t, err)
+//
+// r, err := sslClient.Do(req)
+// assert.NoError(t, err)
+//
+// assert.Nil(t, r.TLS)
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+// err2 := r.Body.Close()
+// if err2 != nil {
+// t.Errorf("fail to close the Body: error %v", err2)
+// }
+// c.Stop()
+//}
+//
+//func Test_SSL_Service_Redirect(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+// "address": ":6831",
+// "ssl": {
+// "port": 6902,
+// "redirect": true,
+// "key": "fixtures/server.key",
+// "cert": "fixtures/server.crt"
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php echo pipes",
+// "pool": {"numWorkers": 1}
+// }
+// }`}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+//
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "http://localhost:6831?hello=world", nil)
+// assert.NoError(t, err)
+//
+// r, err := sslClient.Do(req)
+// assert.NoError(t, err)
+// assert.NotNil(t, r.TLS)
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+// err2 := r.Body.Close()
+// if err2 != nil {
+// t.Errorf("fail to close the Body: error %v", err2)
+// }
+// c.Stop()
+//}
+//
+//func Test_SSL_Service_Push(t *testing.T) {
+// logger, _ := test.NewNullLogger()
+// logger.SetLevel(logrus.DebugLevel)
+//
+// c := service.NewContainer(logger)
+// c.Register(ID, &Service{})
+//
+// assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+// "address": ":6032",
+// "ssl": {
+// "port": 6903,
+// "redirect": true,
+// "key": "fixtures/server.key",
+// "cert": "fixtures/server.crt"
+// },
+// "workers":{
+// "command": "php ../../tests/http/client.php push pipes",
+// "pool": {"numWorkers": 1}
+// }
+// }`}))
+//
+// s, st := c.Get(ID)
+// assert.NotNil(t, s)
+// assert.Equal(t, service.StatusOK, st)
+//
+// // should do nothing
+// s.(*Service).Stop()
+//
+// go func() {
+// err := c.Serve()
+// if err != nil {
+// t.Errorf("error during the Serve: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 500)
+//
+// req, err := http.NewRequest("GET", "https://localhost:6903?hello=world", nil)
+// assert.NoError(t, err)
+//
+// r, err := sslClient.Do(req)
+// assert.NoError(t, err)
+//
+// assert.NotNil(t, r.TLS)
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.Equal(t, "", r.Header.Get("Http2-Push"))
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 201, r.StatusCode)
+// assert.Equal(t, "WORLD", string(b))
+//
+//
+// err2 := r.Body.Close()
+// if err2 != nil {
+// t.Errorf("fail to close the Body: error %v", err2)
+// }
+// c.Stop()
+//}
diff --git a/plugins/http/uploads_test.go b/plugins/http/uploads_test.go
index 16365f39..b023b28f 100644
--- a/plugins/http/uploads_test.go
+++ b/plugins/http/uploads_test.go
@@ -1,434 +1,435 @@
package http
-import (
- "bytes"
- "context"
- "crypto/md5"
- "encoding/hex"
- "fmt"
- "io"
- "io/ioutil"
- "mime/multipart"
- "net/http"
- "os"
- "testing"
- "time"
-
- json "github.com/json-iterator/go"
- "github.com/stretchr/testify/assert"
-)
-
-func TestHandler_Upload_File(t *testing.T) {
- h := &Handler{
- cfg: &Config{
- MaxRequestSize: 1024,
- Uploads: &UploadsConfig{
- Dir: os.TempDir(),
- Forbid: []string{},
- },
- },
- rr: roadrunner.NewServer(&roadrunner.ServerConfig{
- Command: "php ../../tests/http/client.php upload pipes",
- Relay: "pipes",
- Pool: &roadrunner.Config{
- NumWorkers: 1,
- AllocateTimeout: 10000000,
- DestroyTimeout: 10000000,
- },
- }),
- }
-
- assert.NoError(t, h.rr.Start())
- defer h.rr.Stop()
-
- hs := &http.Server{Addr: ":8021", Handler: h}
- defer func() {
- err := hs.Shutdown(context.Background())
- if err != nil {
- t.Errorf("error during the shutdown: error %v", err)
- }
- }()
-
- go func() {
- err := hs.ListenAndServe()
- if err != nil && err != http.ErrServerClosed {
- t.Errorf("error listening the interface: error %v", err)
- }
- }()
- time.Sleep(time.Millisecond * 10)
-
- var mb bytes.Buffer
- w := multipart.NewWriter(&mb)
-
- f := mustOpen("uploads_test.go")
- defer func() {
- err := f.Close()
- if err != nil {
- t.Errorf("failed to close a file: error %v", err)
- }
- }()
- fw, err := w.CreateFormFile("upload", f.Name())
- assert.NotNil(t, fw)
- assert.NoError(t, err)
- _, err = io.Copy(fw, f)
- if err != nil {
- t.Errorf("error copying the file: error %v", err)
- }
-
- err = w.Close()
- if err != nil {
- t.Errorf("error closing the file: error %v", err)
- }
-
- req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
- assert.NoError(t, err)
-
- req.Header.Set("Content-Type", w.FormDataContentType())
-
- r, err := http.DefaultClient.Do(req)
- assert.NoError(t, err)
- defer func() {
- err := r.Body.Close()
- if err != nil {
- t.Errorf("error closing the Body: error %v", err)
- }
- }()
-
- b, err := ioutil.ReadAll(r.Body)
- assert.NoError(t, err)
-
- assert.NoError(t, err)
- assert.Equal(t, 200, r.StatusCode)
-
- fs := fileString("uploads_test.go", 0, "application/octet-stream")
-
- assert.Equal(t, `{"upload":`+fs+`}`, string(b))
-}
-
-func TestHandler_Upload_NestedFile(t *testing.T) {
- h := &Handler{
- cfg: &Config{
- MaxRequestSize: 1024,
- Uploads: &UploadsConfig{
- Dir: os.TempDir(),
- Forbid: []string{},
- },
- },
- rr: roadrunner.NewServer(&roadrunner.ServerConfig{
- Command: "php ../../tests/http/client.php upload pipes",
- Relay: "pipes",
- Pool: &roadrunner.Config{
- NumWorkers: 1,
- AllocateTimeout: 10000000,
- DestroyTimeout: 10000000,
- },
- }),
- }
-
- assert.NoError(t, h.rr.Start())
- defer h.rr.Stop()
-
- hs := &http.Server{Addr: ":8021", Handler: h}
- defer func() {
- err := hs.Shutdown(context.Background())
- if err != nil {
- t.Errorf("error during the shutdown: error %v", err)
- }
- }()
-
- go func() {
- err := hs.ListenAndServe()
- if err != nil && err != http.ErrServerClosed {
- t.Errorf("error listening the interface: error %v", err)
- }
- }()
- time.Sleep(time.Millisecond * 10)
-
- var mb bytes.Buffer
- w := multipart.NewWriter(&mb)
-
- f := mustOpen("uploads_test.go")
- defer func() {
- err := f.Close()
- if err != nil {
- t.Errorf("failed to close a file: error %v", err)
- }
- }()
- fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name())
- assert.NotNil(t, fw)
- assert.NoError(t, err)
- _, err = io.Copy(fw, f)
- if err != nil {
- t.Errorf("error copying the file: error %v", err)
- }
-
- err = w.Close()
- if err != nil {
- t.Errorf("error closing the file: error %v", err)
- }
-
- req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
- assert.NoError(t, err)
-
- req.Header.Set("Content-Type", w.FormDataContentType())
-
- r, err := http.DefaultClient.Do(req)
- assert.NoError(t, err)
- defer func() {
- err := r.Body.Close()
- if err != nil {
- t.Errorf("error closing the Body: error %v", err)
- }
- }()
-
- b, err := ioutil.ReadAll(r.Body)
- assert.NoError(t, err)
-
- assert.NoError(t, err)
- assert.Equal(t, 200, r.StatusCode)
-
- fs := fileString("uploads_test.go", 0, "application/octet-stream")
-
- assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b))
-}
-
-func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
- h := &Handler{
- cfg: &Config{
- MaxRequestSize: 1024,
- Uploads: &UploadsConfig{
- Dir: "-----",
- Forbid: []string{},
- },
- },
- rr: roadrunner.NewServer(&roadrunner.ServerConfig{
- Command: "php ../../tests/http/client.php upload pipes",
- Relay: "pipes",
- Pool: &roadrunner.Config{
- NumWorkers: 1,
- AllocateTimeout: 10000000,
- DestroyTimeout: 10000000,
- },
- }),
- }
-
- assert.NoError(t, h.rr.Start())
- defer h.rr.Stop()
-
- hs := &http.Server{Addr: ":8021", Handler: h}
- defer func() {
- err := hs.Shutdown(context.Background())
- if err != nil {
- t.Errorf("error during the shutdown: error %v", err)
- }
- }()
-
- go func() {
- err := hs.ListenAndServe()
- if err != nil && err != http.ErrServerClosed {
- t.Errorf("error listening the interface: error %v", err)
- }
- }()
- time.Sleep(time.Millisecond * 10)
-
- var mb bytes.Buffer
- w := multipart.NewWriter(&mb)
-
- f := mustOpen("uploads_test.go")
- defer func() {
- err := f.Close()
- if err != nil {
- t.Errorf("failed to close a file: error %v", err)
- }
- }()
- fw, err := w.CreateFormFile("upload", f.Name())
- assert.NotNil(t, fw)
- assert.NoError(t, err)
- _, err = io.Copy(fw, f)
- if err != nil {
- t.Errorf("error copying the file: error %v", err)
- }
-
- err = w.Close()
- if err != nil {
- t.Errorf("error closing the file: error %v", err)
- }
-
- req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
- assert.NoError(t, err)
-
- req.Header.Set("Content-Type", w.FormDataContentType())
-
- r, err := http.DefaultClient.Do(req)
- assert.NoError(t, err)
- defer func() {
- err := r.Body.Close()
- if err != nil {
- t.Errorf("error closing the Body: error %v", err)
- }
- }()
-
- b, err := ioutil.ReadAll(r.Body)
- assert.NoError(t, err)
-
- assert.NoError(t, err)
- assert.Equal(t, 200, r.StatusCode)
-
- fs := fileString("uploads_test.go", 5, "application/octet-stream")
-
- assert.Equal(t, `{"upload":`+fs+`}`, string(b))
-}
-
-func TestHandler_Upload_File_Forbids(t *testing.T) {
- h := &Handler{
- cfg: &Config{
- MaxRequestSize: 1024,
- Uploads: &UploadsConfig{
- Dir: os.TempDir(),
- Forbid: []string{".go"},
- },
- },
- rr: roadrunner.NewServer(&roadrunner.ServerConfig{
- Command: "php ../../tests/http/client.php upload pipes",
- Relay: "pipes",
- Pool: &roadrunner.Config{
- NumWorkers: 1,
- AllocateTimeout: 10000000,
- DestroyTimeout: 10000000,
- },
- }),
- }
-
- assert.NoError(t, h.rr.Start())
- defer h.rr.Stop()
-
- hs := &http.Server{Addr: ":8021", Handler: h}
- defer func() {
- err := hs.Shutdown(context.Background())
- if err != nil {
- t.Errorf("error during the shutdown: error %v", err)
- }
- }()
-
- go func() {
- err := hs.ListenAndServe()
- if err != nil && err != http.ErrServerClosed {
- t.Errorf("error listening the interface: error %v", err)
- }
- }()
- time.Sleep(time.Millisecond * 10)
-
- var mb bytes.Buffer
- w := multipart.NewWriter(&mb)
-
- f := mustOpen("uploads_test.go")
- defer func() {
- err := f.Close()
- if err != nil {
- t.Errorf("failed to close a file: error %v", err)
- }
- }()
- fw, err := w.CreateFormFile("upload", f.Name())
- assert.NotNil(t, fw)
- assert.NoError(t, err)
- _, err = io.Copy(fw, f)
- if err != nil {
- t.Errorf("error copying the file: error %v", err)
- }
-
- err = w.Close()
- if err != nil {
- t.Errorf("error closing the file: error %v", err)
- }
-
- req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
- assert.NoError(t, err)
-
- req.Header.Set("Content-Type", w.FormDataContentType())
-
- r, err := http.DefaultClient.Do(req)
- assert.NoError(t, err)
- defer func() {
- err := r.Body.Close()
- if err != nil {
- t.Errorf("error closing the Body: error %v", err)
- }
- }()
-
- b, err := ioutil.ReadAll(r.Body)
- assert.NoError(t, err)
-
- assert.NoError(t, err)
- assert.Equal(t, 200, r.StatusCode)
-
- fs := fileString("uploads_test.go", 7, "application/octet-stream")
-
- assert.Equal(t, `{"upload":`+fs+`}`, string(b))
-}
-
-func Test_FileExists(t *testing.T) {
- assert.True(t, exists("uploads_test.go"))
- assert.False(t, exists("uploads_test."))
-}
-
-func mustOpen(f string) *os.File {
- r, err := os.Open(f)
- if err != nil {
- panic(err)
- }
- return r
-}
-
-type fInfo struct {
- Name string `json:"name"`
- Size int64 `json:"size"`
- Mime string `json:"mime"`
- Error int `json:"error"`
- MD5 string `json:"md5,omitempty"`
-}
-
-func fileString(f string, errNo int, mime string) string {
- s, err := os.Stat(f)
- if err != nil {
- fmt.Println(fmt.Errorf("error stat the file, error: %v", err))
- }
-
- ff, err := os.Open(f)
- if err != nil {
- fmt.Println(fmt.Errorf("error opening the file, error: %v", err))
- }
-
- defer func() {
- er := ff.Close()
- if er != nil {
- fmt.Println(fmt.Errorf("error closing the file, error: %v", er))
- }
- }()
-
- h := md5.New()
- _, err = io.Copy(h, ff)
- if err != nil {
- fmt.Println(fmt.Errorf("error copying the file, error: %v", err))
- }
-
- v := &fInfo{
- Name: s.Name(),
- Size: s.Size(),
- Error: errNo,
- Mime: mime,
- MD5: hex.EncodeToString(h.Sum(nil)),
- }
-
- if errNo != 0 {
- v.MD5 = ""
- v.Size = 0
- }
-
- j := json.ConfigCompatibleWithStandardLibrary
- r, err := j.Marshal(v)
- if err != nil {
- fmt.Println(fmt.Errorf("error marshalling fInfo, error: %v", err))
- }
- return string(r)
-
-}
+//
+//import (
+// "bytes"
+// "context"
+// "crypto/md5"
+// "encoding/hex"
+// "fmt"
+// "io"
+// "io/ioutil"
+// "mime/multipart"
+// "net/http"
+// "os"
+// "testing"
+// "time"
+//
+// json "github.com/json-iterator/go"
+// "github.com/stretchr/testify/assert"
+//)
+//
+//func TestHandler_Upload_File(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php upload pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8021", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// var mb bytes.Buffer
+// w := multipart.NewWriter(&mb)
+//
+// f := mustOpen("uploads_test.go")
+// defer func() {
+// err := f.Close()
+// if err != nil {
+// t.Errorf("failed to close a file: error %v", err)
+// }
+// }()
+// fw, err := w.CreateFormFile("upload", f.Name())
+// assert.NotNil(t, fw)
+// assert.NoError(t, err)
+// _, err = io.Copy(fw, f)
+// if err != nil {
+// t.Errorf("error copying the file: error %v", err)
+// }
+//
+// err = w.Close()
+// if err != nil {
+// t.Errorf("error closing the file: error %v", err)
+// }
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+// assert.NoError(t, err)
+//
+// req.Header.Set("Content-Type", w.FormDataContentType())
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error closing the Body: error %v", err)
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// fs := fileString("uploads_test.go", 0, "application/octet-stream")
+//
+// assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+//}
+//
+//func TestHandler_Upload_NestedFile(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php upload pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8021", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// var mb bytes.Buffer
+// w := multipart.NewWriter(&mb)
+//
+// f := mustOpen("uploads_test.go")
+// defer func() {
+// err := f.Close()
+// if err != nil {
+// t.Errorf("failed to close a file: error %v", err)
+// }
+// }()
+// fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name())
+// assert.NotNil(t, fw)
+// assert.NoError(t, err)
+// _, err = io.Copy(fw, f)
+// if err != nil {
+// t.Errorf("error copying the file: error %v", err)
+// }
+//
+// err = w.Close()
+// if err != nil {
+// t.Errorf("error closing the file: error %v", err)
+// }
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+// assert.NoError(t, err)
+//
+// req.Header.Set("Content-Type", w.FormDataContentType())
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error closing the Body: error %v", err)
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// fs := fileString("uploads_test.go", 0, "application/octet-stream")
+//
+// assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b))
+//}
+//
+//func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: "-----",
+// Forbid: []string{},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php upload pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8021", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// var mb bytes.Buffer
+// w := multipart.NewWriter(&mb)
+//
+// f := mustOpen("uploads_test.go")
+// defer func() {
+// err := f.Close()
+// if err != nil {
+// t.Errorf("failed to close a file: error %v", err)
+// }
+// }()
+// fw, err := w.CreateFormFile("upload", f.Name())
+// assert.NotNil(t, fw)
+// assert.NoError(t, err)
+// _, err = io.Copy(fw, f)
+// if err != nil {
+// t.Errorf("error copying the file: error %v", err)
+// }
+//
+// err = w.Close()
+// if err != nil {
+// t.Errorf("error closing the file: error %v", err)
+// }
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+// assert.NoError(t, err)
+//
+// req.Header.Set("Content-Type", w.FormDataContentType())
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error closing the Body: error %v", err)
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// fs := fileString("uploads_test.go", 5, "application/octet-stream")
+//
+// assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+//}
+//
+//func TestHandler_Upload_File_Forbids(t *testing.T) {
+// h := &Handler{
+// cfg: &Config{
+// MaxRequestSize: 1024,
+// Uploads: &UploadsConfig{
+// Dir: os.TempDir(),
+// Forbid: []string{".go"},
+// },
+// },
+// pool: roadrunner.NewServer(&roadrunner.ServerConfig{
+// Command: "php ../../tests/http/client.php upload pipes",
+// Relay: "pipes",
+// Pool: &roadrunner.Config{
+// NumWorkers: 1,
+// AllocateTimeout: 10000000,
+// DestroyTimeout: 10000000,
+// },
+// }),
+// }
+//
+// assert.NoError(t, h.pool.Start())
+// defer h.pool.Stop()
+//
+// hs := &http.Server{Addr: ":8021", Handler: h}
+// defer func() {
+// err := hs.Shutdown(context.Background())
+// if err != nil {
+// t.Errorf("error during the shutdown: error %v", err)
+// }
+// }()
+//
+// go func() {
+// err := hs.ListenAndServe()
+// if err != nil && err != http.ErrServerClosed {
+// t.Errorf("error listening the interface: error %v", err)
+// }
+// }()
+// time.Sleep(time.Millisecond * 10)
+//
+// var mb bytes.Buffer
+// w := multipart.NewWriter(&mb)
+//
+// f := mustOpen("uploads_test.go")
+// defer func() {
+// err := f.Close()
+// if err != nil {
+// t.Errorf("failed to close a file: error %v", err)
+// }
+// }()
+// fw, err := w.CreateFormFile("upload", f.Name())
+// assert.NotNil(t, fw)
+// assert.NoError(t, err)
+// _, err = io.Copy(fw, f)
+// if err != nil {
+// t.Errorf("error copying the file: error %v", err)
+// }
+//
+// err = w.Close()
+// if err != nil {
+// t.Errorf("error closing the file: error %v", err)
+// }
+//
+// req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
+// assert.NoError(t, err)
+//
+// req.Header.Set("Content-Type", w.FormDataContentType())
+//
+// r, err := http.DefaultClient.Do(req)
+// assert.NoError(t, err)
+// defer func() {
+// err := r.Body.Close()
+// if err != nil {
+// t.Errorf("error closing the Body: error %v", err)
+// }
+// }()
+//
+// b, err := ioutil.ReadAll(r.Body)
+// assert.NoError(t, err)
+//
+// assert.NoError(t, err)
+// assert.Equal(t, 200, r.StatusCode)
+//
+// fs := fileString("uploads_test.go", 7, "application/octet-stream")
+//
+// assert.Equal(t, `{"upload":`+fs+`}`, string(b))
+//}
+//
+//func Test_FileExists(t *testing.T) {
+// assert.True(t, exists("uploads_test.go"))
+// assert.False(t, exists("uploads_test."))
+//}
+//
+//func mustOpen(f string) *os.File {
+// r, err := os.Open(f)
+// if err != nil {
+// panic(err)
+// }
+// return r
+//}
+//
+//type fInfo struct {
+// Name string `json:"name"`
+// Size int64 `json:"size"`
+// Mime string `json:"mime"`
+// Error int `json:"error"`
+// MD5 string `json:"md5,omitempty"`
+//}
+//
+//func fileString(f string, errNo int, mime string) string {
+// s, err := os.Stat(f)
+// if err != nil {
+// fmt.Println(fmt.Errorf("error stat the file, error: %v", err))
+// }
+//
+// ff, err := os.Open(f)
+// if err != nil {
+// fmt.Println(fmt.Errorf("error opening the file, error: %v", err))
+// }
+//
+// defer func() {
+// er := ff.Close()
+// if er != nil {
+// fmt.Println(fmt.Errorf("error closing the file, error: %v", er))
+// }
+// }()
+//
+// h := md5.New()
+// _, err = io.Copy(h, ff)
+// if err != nil {
+// fmt.Println(fmt.Errorf("error copying the file, error: %v", err))
+// }
+//
+// v := &fInfo{
+// Name: s.Name(),
+// Size: s.Size(),
+// Error: errNo,
+// Mime: mime,
+// MD5: hex.EncodeToString(h.Sum(nil)),
+// }
+//
+// if errNo != 0 {
+// v.MD5 = ""
+// v.Size = 0
+// }
+//
+// j := json.ConfigCompatibleWithStandardLibrary
+// r, err := j.Marshal(v)
+// if err != nil {
+// fmt.Println(fmt.Errorf("error marshalling fInfo, error: %v", err))
+// }
+// return string(r)
+//
+//}
diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go
index 840021eb..4d31138e 100644
--- a/plugins/server/tests/plugin_pipes.go
+++ b/plugins/server/tests/plugin_pipes.go
@@ -30,11 +30,11 @@ var testPoolConfig = roadrunner.PoolConfig{
type Foo struct {
configProvider config.Configurer
- wf server.WorkerFactory
+ wf server.Server
pool roadrunner.Pool
}
-func (f *Foo) Init(p config.Configurer, workerFactory server.WorkerFactory) error {
+func (f *Foo) Init(p config.Configurer, workerFactory server.Server) error {
f.configProvider = p
f.wf = workerFactory
return nil
diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go
index b12f4ead..4942d4c5 100644
--- a/plugins/server/tests/plugin_sockets.go
+++ b/plugins/server/tests/plugin_sockets.go
@@ -12,11 +12,11 @@ import (
type Foo2 struct {
configProvider config.Configurer
- wf server.WorkerFactory
+ wf server.Server
pool roadrunner.Pool
}
-func (f *Foo2) Init(p config.Configurer, workerFactory server.WorkerFactory) error {
+func (f *Foo2) Init(p config.Configurer, workerFactory server.Server) error {
f.configProvider = p
f.wf = workerFactory
return nil
diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go
index 39044577..89757a02 100644
--- a/plugins/server/tests/plugin_tcp.go
+++ b/plugins/server/tests/plugin_tcp.go
@@ -12,11 +12,11 @@ import (
type Foo3 struct {
configProvider config.Configurer
- wf server.WorkerFactory
+ wf server.Server
pool roadrunner.Pool
}
-func (f *Foo3) Init(p config.Configurer, workerFactory server.WorkerFactory) error {
+func (f *Foo3) Init(p config.Configurer, workerFactory server.Server) error {
f.configProvider = p
f.wf = workerFactory
return nil
diff --git a/src/Exception/RoadRunnerException.php b/src/Exception/RoadRunnerException.php
index f83c3dd4..cd657502 100644
--- a/src/Exception/RoadRunnerException.php
+++ b/src/Exception/RoadRunnerException.php
@@ -9,6 +9,6 @@ declare(strict_types=1);
namespace Spiral\RoadRunner\Exception;
-class RoadRunnerException extends \Spiral\RoadRunner\Exceptions\RoadRunnerException
+class RoadRunnerException extends \RuntimeException
{
}
diff --git a/src/Http/HttpClient.php b/src/Http/HttpClient.php
new file mode 100644
index 00000000..4ca152c8
--- /dev/null
+++ b/src/Http/HttpClient.php
@@ -0,0 +1,75 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Alex Bond
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+final class HttpClient
+{
+ /** @var Worker */
+ private $worker;
+
+ /**
+ * @param Worker $worker
+ */
+ public function __construct(Worker $worker)
+ {
+ $this->worker = $worker;
+ }
+
+ /**
+ * @return Worker
+ */
+ public function getWorker(): Worker
+ {
+ return $this->worker;
+ }
+
+ /**
+ * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string]
+ * or null if termination request or invalid context.
+ */
+ public function acceptRequest(): ?array
+ {
+ $body = $this->getWorker()->receive($ctx);
+ if (empty($body) && empty($ctx)) {
+ // termination request
+ return null;
+ }
+
+ $ctx = json_decode($ctx, true);
+ if ($ctx === null) {
+ // invalid context
+ return null;
+ }
+
+ return ['ctx' => $ctx, 'body' => $body];
+ }
+
+ /**
+ * Send response to the application server.
+ *
+ * @param int $status Http status code
+ * @param string $body Body of response
+ * @param string[][] $headers An associative array of the message's headers. Each
+ * key MUST be a header name, and each value MUST be an array of strings
+ * for that header.
+ */
+ public function respond(int $status, string $body, array $headers = []): void
+ {
+ if (empty($headers)) {
+ // this is required to represent empty header set as map and not as array
+ $headers = new \stdClass();
+ }
+
+ $this->getWorker()->send(
+ $body,
+ (string) json_encode(['status' => $status, 'headers' => $headers])
+ );
+ }
+}
diff --git a/src/Http/PSR7Client.php b/src/Http/PSR7Client.php
new file mode 100644
index 00000000..777dd891
--- /dev/null
+++ b/src/Http/PSR7Client.php
@@ -0,0 +1,217 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Wolfy-J
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestFactoryInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use Psr\Http\Message\StreamFactoryInterface;
+use Psr\Http\Message\UploadedFileFactoryInterface;
+use Psr\Http\Message\UploadedFileInterface;
+
+/**
+ * Manages PSR-7 request and response.
+ */
+class PSR7Client
+{
+ /** @var HttpClient */
+ private $httpClient;
+
+ /** @var ServerRequestFactoryInterface */
+ private $requestFactory;
+
+ /** @var StreamFactoryInterface */
+ private $streamFactory;
+
+ /** @var UploadedFileFactoryInterface */
+ private $uploadsFactory;
+
+ /** @var mixed[] */
+ private $originalServer = [];
+
+ /** @var string[] Valid values for HTTP protocol version */
+ private static $allowedVersions = ['1.0', '1.1', '2',];
+
+ /**
+ * @param Worker $worker
+ * @param ServerRequestFactoryInterface|null $requestFactory
+ * @param StreamFactoryInterface|null $streamFactory
+ * @param UploadedFileFactoryInterface|null $uploadsFactory
+ */
+ public function __construct(
+ Worker $worker,
+ ServerRequestFactoryInterface $requestFactory = null,
+ StreamFactoryInterface $streamFactory = null,
+ UploadedFileFactoryInterface $uploadsFactory = null
+ ) {
+ $this->httpClient = new HttpClient($worker);
+ $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory();
+ $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory();
+ $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory();
+ $this->originalServer = $_SERVER;
+ }
+
+ /**
+ * @return Worker
+ */
+ public function getWorker(): Worker
+ {
+ return $this->httpClient->getWorker();
+ }
+
+ /**
+ * @return ServerRequestInterface|null
+ */
+ public function acceptRequest(): ?ServerRequestInterface
+ {
+ $rawRequest = $this->httpClient->acceptRequest();
+ if ($rawRequest === null) {
+ return null;
+ }
+
+ $_SERVER = $this->configureServer($rawRequest['ctx']);
+
+ $request = $this->requestFactory->createServerRequest(
+ $rawRequest['ctx']['method'],
+ $rawRequest['ctx']['uri'],
+ $_SERVER
+ );
+
+ parse_str($rawRequest['ctx']['rawQuery'], $query);
+
+ $request = $request
+ ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol']))
+ ->withCookieParams($rawRequest['ctx']['cookies'])
+ ->withQueryParams($query)
+ ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads']));
+
+ foreach ($rawRequest['ctx']['attributes'] as $name => $value) {
+ $request = $request->withAttribute($name, $value);
+ }
+
+ foreach ($rawRequest['ctx']['headers'] as $name => $value) {
+ $request = $request->withHeader($name, $value);
+ }
+
+ if ($rawRequest['ctx']['parsed']) {
+ return $request->withParsedBody(json_decode($rawRequest['body'], true));
+ }
+
+ if ($rawRequest['body'] !== null) {
+ return $request->withBody($this->streamFactory->createStream($rawRequest['body']));
+ }
+
+ return $request;
+ }
+
+ /**
+ * Send response to the application server.
+ *
+ * @param ResponseInterface $response
+ */
+ public function respond(ResponseInterface $response): void
+ {
+ $this->httpClient->respond(
+ $response->getStatusCode(),
+ $response->getBody()->__toString(),
+ $response->getHeaders()
+ );
+ }
+
+ /**
+ * Returns altered copy of _SERVER variable. Sets ip-address,
+ * request-time and other values.
+ *
+ * @param mixed[] $ctx
+ * @return mixed[]
+ */
+ protected function configureServer(array $ctx): array
+ {
+ $server = $this->originalServer;
+
+ $server['REQUEST_URI'] = $ctx['uri'];
+ $server['REQUEST_TIME'] = time();
+ $server['REQUEST_TIME_FLOAT'] = microtime(true);
+ $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1';
+ $server['REQUEST_METHOD'] = $ctx['method'];
+
+ $server['HTTP_USER_AGENT'] = '';
+ foreach ($ctx['headers'] as $key => $value) {
+ $key = strtoupper(str_replace('-', '_', $key));
+ if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) {
+ $server[$key] = implode(', ', $value);
+ } else {
+ $server['HTTP_' . $key] = implode(', ', $value);
+ }
+ }
+
+ return $server;
+ }
+
+ /**
+ * Wraps all uploaded files with UploadedFile.
+ *
+ * @param array[] $files
+ *
+ * @return UploadedFileInterface[]|mixed[]
+ */
+ private function wrapUploads($files): array
+ {
+ if (empty($files)) {
+ return [];
+ }
+
+ $result = [];
+ foreach ($files as $index => $f) {
+ if (!isset($f['name'])) {
+ $result[$index] = $this->wrapUploads($f);
+ continue;
+ }
+
+ if (UPLOAD_ERR_OK === $f['error']) {
+ $stream = $this->streamFactory->createStreamFromFile($f['tmpName']);
+ } else {
+ $stream = $this->streamFactory->createStream();
+ }
+
+ $result[$index] = $this->uploadsFactory->createUploadedFile(
+ $stream,
+ $f['size'],
+ $f['error'],
+ $f['name'],
+ $f['mime']
+ );
+ }
+
+ return $result;
+ }
+
+ /**
+ * Normalize HTTP protocol version to valid values
+ *
+ * @param string $version
+ * @return string
+ */
+ private static function fetchProtocolVersion(string $version): string
+ {
+ $v = substr($version, 5);
+
+ if ($v === '2.0') {
+ return '2';
+ }
+
+ // Fallback for values outside of valid protocol versions
+ if (!in_array($v, static::$allowedVersions, true)) {
+ return '1.1';
+ }
+
+ return $v;
+ }
+}
diff --git a/src/Logger/.empty b/src/Logger/.empty
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/Logger/.empty
diff --git a/src/Metrics/Metrics.php b/src/Metrics/Metrics.php
new file mode 100644
index 00000000..d6b6e1da
--- /dev/null
+++ b/src/Metrics/Metrics.php
@@ -0,0 +1,80 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\Goridge\Exceptions\RPCException;
+use Spiral\Goridge\RPC;
+use Spiral\RoadRunner\Exception\MetricException;
+
+/**
+ * Application metrics.
+ */
+final class Metrics implements MetricsInterface
+{
+ /** @var RPC */
+ private $rpc;
+
+ /**
+ * @param RPC $rpc
+ */
+ public function __construct(RPC $rpc)
+ {
+ $this->rpc = $rpc;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function add(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Add', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sub(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function observe(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function set(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Set', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+}
diff --git a/src/Metrics/MetricsInterface.php b/src/Metrics/MetricsInterface.php
new file mode 100644
index 00000000..ec2009b0
--- /dev/null
+++ b/src/Metrics/MetricsInterface.php
@@ -0,0 +1,64 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\RoadRunner\Exception\MetricException;
+
+interface MetricsInterface
+{
+ /**
+ * Add collector value. Fallback to appropriate method of related collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function add(string $collector, float $value, array $labels = []);
+
+ /**
+ * Subtract the collector value, only for gauge collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function sub(string $collector, float $value, array $labels = []);
+
+ /**
+ * Observe collector value, only for histogram and summary collectors.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function observe(string $collector, float $value, array $labels = []);
+
+ /**
+ * Set collector value, only for gauge collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function set(string $collector, float $value, array $labels = []);
+}
diff --git a/src/WorkerInterface.php b/src/WorkerInterface.php
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/WorkerInterface.php