summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-01-05 13:35:58 +0300
committerWolfy-J <[email protected]>2019-01-05 13:35:58 +0300
commit04f7ac73c477938b7390ec6ec101d0e4bcc8cd80 (patch)
tree361c8cd8a00824d0b2e5003c6a5978f630c20469
parent46009112a783a1fdae95e0a061d4c8c41a1c8ff1 (diff)
second set of patches
-rw-r--r--Makefile2
-rw-r--r--osutil/cmd_group.go13
-rw-r--r--osutil/cmd_group_win.go13
-rw-r--r--pool.go2
-rw-r--r--server.go2
-rw-r--r--server_config.go4
-rw-r--r--service/env/environment.go2
-rw-r--r--service/http/handler.go43
-rw-r--r--service/http/handler_test.go380
-rw-r--r--service/http/service.go5
-rw-r--r--service/http/uploads_test.go49
-rw-r--r--service/rpc/config.go2
-rw-r--r--tests/http/echoDelay.php11
-rw-r--r--tests/http/payload.php5
-rw-r--r--util/fasttime.go36
-rw-r--r--util/fasttime_test.go46
16 files changed, 482 insertions, 133 deletions
diff --git a/Makefile b/Makefile
index cc7979cd..345fd4b7 100644
--- a/Makefile
+++ b/Makefile
@@ -10,7 +10,7 @@ uninstall:
rm -f /usr/local/bin/rr
test:
go test -v -race -cover
- go test -v -race -cover ./util
+ go test -v -race -cover ./util
go test -v -race -cover ./service
go test -v -race -cover ./service/env
go test -v -race -cover ./service/rpc
diff --git a/osutil/cmd_group.go b/osutil/cmd_group.go
new file mode 100644
index 00000000..d4b64fb6
--- /dev/null
+++ b/osutil/cmd_group.go
@@ -0,0 +1,13 @@
+// +build !windows
+
+package osutil
+
+import (
+ "os/exec"
+ "syscall"
+)
+
+// IsolateProcess change gpid for the process to avoid bypassing signals to php processes.
+func IsolateProcess(cmd *exec.Cmd) {
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
+}
diff --git a/osutil/cmd_group_win.go b/osutil/cmd_group_win.go
new file mode 100644
index 00000000..ca7fca20
--- /dev/null
+++ b/osutil/cmd_group_win.go
@@ -0,0 +1,13 @@
+// +build windows
+
+package osutil
+
+import (
+ "os/exec"
+ "syscall"
+)
+
+// IsolateProcess change gpid for the process to avoid bypassing signals to php processes.
+func IsolateProcess(cmd *exec.Cmd) {
+ cmd.SysProcAttr = &syscall.SysProcAttr{CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP}
+}
diff --git a/pool.go b/pool.go
index 30c30dfa..7dfea26c 100644
--- a/pool.go
+++ b/pool.go
@@ -22,7 +22,7 @@ const (
// Pool managed set of inner worker processes.
type Pool interface {
- // AddListener all caused events to attached watcher.
+ // Listen all caused events to attached watcher.
Listen(l func(event int, ctx interface{}))
// Exec one task with given payload and context, returns result or error.
diff --git a/server.go b/server.go
index 48741d20..25ce7945 100644
--- a/server.go
+++ b/server.go
@@ -176,7 +176,7 @@ func (s *Server) Pool() Pool {
return s.pool
}
-// AddListener pool events.
+// Listen pool events.
func (s *Server) poolListener(event int, ctx interface{}) {
if event == EventPoolError {
// pool failure, rebuilding
diff --git a/server_config.go b/server_config.go
index 199ce27b..35965962 100644
--- a/server_config.go
+++ b/server_config.go
@@ -3,6 +3,7 @@ package roadrunner
import (
"errors"
"fmt"
+ "github.com/spiral/roadrunner/osutil"
"net"
"os"
"os/exec"
@@ -75,8 +76,7 @@ func (cfg *ServerConfig) makeCommand() func() *exec.Cmd {
var cmd = strings.Split(cfg.Command, " ")
return func() *exec.Cmd {
cmd := exec.Command(cmd[0], cmd[1:]...)
-
- cmd.SysProcAttr = &syscall.SysProcAttr{CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP}
+ osutil.IsolateProcess(cmd)
cmd.Env = append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", cfg.Relay))
cmd.Env = append(cmd.Env, cfg.env...)
diff --git a/service/env/environment.go b/service/env/environment.go
index fe8f1dd8..ab8febf7 100644
--- a/service/env/environment.go
+++ b/service/env/environment.go
@@ -16,7 +16,7 @@ type Setter interface {
SetEnv(key, value string)
}
-// Setter provides ability to set environment value.
+// Getter provides ability to set environment value.
type Getter interface {
// GetEnv must return list of env variables.
GetEnv() (map[string]string, error)
diff --git a/service/http/handler.go b/service/http/handler.go
index d7521959..8ff1bdeb 100644
--- a/service/http/handler.go
+++ b/service/http/handler.go
@@ -3,9 +3,11 @@ package http
import (
"github.com/pkg/errors"
"github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/util"
"net/http"
"strconv"
"sync"
+ "time"
)
const (
@@ -23,6 +25,14 @@ type ErrorEvent struct {
// Error - associated error, if any.
Error error
+
+ // event timings
+ start, end int64
+}
+
+// Elapsed returns duration of the invocation.
+func (e *ErrorEvent) Elapsed() time.Duration {
+ return time.Duration(e.end - e.start)
}
// ResponseEvent represents singular http response event.
@@ -32,11 +42,20 @@ type ResponseEvent struct {
// Response contains service response.
Response *Response
+
+ // event timings
+ start, end int64
+}
+
+// Elapsed returns duration of the invocation.
+func (e *ResponseEvent) Elapsed() time.Duration {
+ return time.Duration(e.end - e.start)
}
// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
// parsed files and query, payload will include parsed form dataTree (if any).
type Handler struct {
+ ft *util.FastTime
cfg *Config
rr *roadrunner.Server
mul sync.Mutex
@@ -53,14 +72,16 @@ func (h *Handler) Listen(l func(event int, ctx interface{})) {
// mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ start := h.ft.UnixNano()
+
// validating request size
if h.cfg.MaxRequest != 0 {
if length := r.Header.Get("content-length"); length != "" {
if size, err := strconv.ParseInt(length, 10, 64); err != nil {
- h.handleError(w, r, err)
+ h.handleError(w, r, err, start)
return
} else if size > h.cfg.MaxRequest*1024*1024 {
- h.handleError(w, r, errors.New("request body max size is exceeded"))
+ h.handleError(w, r, errors.New("request body max size is exceeded"), start)
return
}
}
@@ -68,7 +89,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
req, err := NewRequest(r, h.cfg.Uploads)
if err != nil {
- h.handleError(w, r, err)
+ h.handleError(w, r, err, start)
return
}
@@ -77,37 +98,37 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
p, err := req.Payload()
if err != nil {
- h.handleError(w, r, err)
+ h.handleError(w, r, err, start)
return
}
rsp, err := h.rr.Exec(p)
if err != nil {
- h.handleError(w, r, err)
+ h.handleError(w, r, err, start)
return
}
resp, err := NewResponse(rsp)
if err != nil {
- h.handleError(w, r, err)
+ h.handleError(w, r, err, start)
return
}
- h.handleResponse(req, resp)
+ h.handleResponse(req, resp, start)
resp.Write(w)
}
// handleError sends error.
-func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error) {
- h.throw(EventError, &ErrorEvent{Request: r, Error: err})
+func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error, start int64) {
+ h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, end: h.ft.UnixNano()})
w.WriteHeader(500)
w.Write([]byte(err.Error()))
}
// handleResponse triggers response event.
-func (h *Handler) handleResponse(req *Request, resp *Response) {
- h.throw(EventResponse, &ResponseEvent{Request: req, Response: resp})
+func (h *Handler) handleResponse(req *Request, resp *Response, start int64) {
+ h.throw(EventResponse, &ResponseEvent{Request: req, Response: resp, start: start, end: h.ft.UnixNano()})
}
// throw invokes event handler if any.
diff --git a/service/http/handler_test.go b/service/http/handler_test.go
index 1750bf43..d8a15202 100644
--- a/service/http/handler_test.go
+++ b/service/http/handler_test.go
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/util"
"github.com/stretchr/testify/assert"
"io/ioutil"
"mime/multipart"
@@ -29,8 +30,9 @@ func get(url string) (string, *http.Response, error) {
return string(b), r, err
}
-func TestServer_Echo(t *testing.T) {
- st := &Handler{
+func TestHandler_Echo(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -48,11 +50,12 @@ func TestServer_Echo(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8177", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -65,7 +68,8 @@ func TestServer_Echo(t *testing.T) {
}
func Test_HandlerErrors(t *testing.T) {
- st := &Handler{
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -83,16 +87,18 @@ func Test_HandlerErrors(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
wr := httptest.NewRecorder()
rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("data")))
- st.ServeHTTP(wr, rq)
+ h.ServeHTTP(wr, rq)
assert.Equal(t, 500, wr.Code)
}
func Test_Handler_JSON_error(t *testing.T) {
- st := &Handler{
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -110,18 +116,20 @@ func Test_Handler_JSON_error(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
wr := httptest.NewRecorder()
rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("{sd")))
rq.Header.Add("Content-Type", "application/json")
rq.Header.Add("Content-Size", "3")
- st.ServeHTTP(wr, rq)
+ h.ServeHTTP(wr, rq)
assert.Equal(t, 500, wr.Code)
}
-func TestServer_Headers(t *testing.T) {
- st := &Handler{
+func TestHandler_Headers(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -139,11 +147,12 @@ func TestServer_Headers(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8078", Handler: st}
+ hs := &http.Server{Addr: ":8078", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -167,8 +176,9 @@ func TestServer_Headers(t *testing.T) {
assert.Equal(t, "SAMPLE", string(b))
}
-func TestServer_Empty_User_Agent(t *testing.T) {
- st := &Handler{
+func TestHandler_Empty_User_Agent(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -186,11 +196,12 @@ func TestServer_Empty_User_Agent(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8088", Handler: st}
+ hs := &http.Server{Addr: ":8088", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -213,9 +224,9 @@ func TestServer_Empty_User_Agent(t *testing.T) {
assert.Equal(t, "", string(b))
}
-
-func TestServer_User_Agent(t *testing.T) {
- st := &Handler{
+func TestHandler_User_Agent(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -233,11 +244,12 @@ func TestServer_User_Agent(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8088", Handler: st}
+ hs := &http.Server{Addr: ":8088", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -260,8 +272,9 @@ func TestServer_User_Agent(t *testing.T) {
assert.Equal(t, "go-agent", string(b))
}
-func TestServer_Cookies(t *testing.T) {
- st := &Handler{
+func TestHandler_Cookies(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -279,11 +292,12 @@ func TestServer_Cookies(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8079", Handler: st}
+ hs := &http.Server{Addr: ":8079", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -311,8 +325,9 @@ func TestServer_Cookies(t *testing.T) {
}
}
-func TestServer_JsonPayload_POST(t *testing.T) {
- st := &Handler{
+func TestHandler_JsonPayload_POST(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -330,11 +345,12 @@ func TestServer_JsonPayload_POST(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8090", Handler: st}
+ hs := &http.Server{Addr: ":8090", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -361,8 +377,9 @@ func TestServer_JsonPayload_POST(t *testing.T) {
assert.Equal(t, `{"value":"key"}`, string(b))
}
-func TestServer_JsonPayload_PUT(t *testing.T) {
- st := &Handler{
+func TestHandler_JsonPayload_PUT(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -380,11 +397,12 @@ func TestServer_JsonPayload_PUT(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8081", Handler: st}
+ hs := &http.Server{Addr: ":8081", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -407,8 +425,9 @@ func TestServer_JsonPayload_PUT(t *testing.T) {
assert.Equal(t, `{"value":"key"}`, string(b))
}
-func TestServer_JsonPayload_PATCH(t *testing.T) {
- st := &Handler{
+func TestHandler_JsonPayload_PATCH(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -426,11 +445,12 @@ func TestServer_JsonPayload_PATCH(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8082", Handler: st}
+ hs := &http.Server{Addr: ":8082", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -453,8 +473,9 @@ func TestServer_JsonPayload_PATCH(t *testing.T) {
assert.Equal(t, `{"value":"key"}`, string(b))
}
-func TestServer_FormData_POST(t *testing.T) {
- st := &Handler{
+func TestHandler_FormData_POST(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -472,11 +493,12 @@ func TestServer_FormData_POST(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8083", Handler: st}
+ hs := &http.Server{Addr: ":8083", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -511,8 +533,9 @@ func TestServer_FormData_POST(t *testing.T) {
assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
}
-func TestServer_FormData_PUT(t *testing.T) {
- st := &Handler{
+func TestHandler_FormData_PUT(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -530,11 +553,12 @@ func TestServer_FormData_PUT(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8084", Handler: st}
+ hs := &http.Server{Addr: ":8084", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -569,8 +593,9 @@ func TestServer_FormData_PUT(t *testing.T) {
assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
}
-func TestServer_FormData_PATCH(t *testing.T) {
- st := &Handler{
+func TestHandler_FormData_PATCH(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -588,11 +613,12 @@ func TestServer_FormData_PATCH(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8085", Handler: st}
+ hs := &http.Server{Addr: ":8085", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -627,8 +653,9 @@ func TestServer_FormData_PATCH(t *testing.T) {
assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
}
-func TestServer_Multipart_POST(t *testing.T) {
- st := &Handler{
+func TestHandler_Multipart_POST(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -646,11 +673,12 @@ func TestServer_Multipart_POST(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8019", Handler: st}
+ hs := &http.Server{Addr: ":8019", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -689,8 +717,9 @@ func TestServer_Multipart_POST(t *testing.T) {
assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
}
-func TestServer_Multipart_PUT(t *testing.T) {
- st := &Handler{
+func TestHandler_Multipart_PUT(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -708,11 +737,12 @@ func TestServer_Multipart_PUT(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8020", Handler: st}
+ hs := &http.Server{Addr: ":8020", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -751,8 +781,9 @@ func TestServer_Multipart_PUT(t *testing.T) {
assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
}
-func TestServer_Multipart_PATCH(t *testing.T) {
- st := &Handler{
+func TestHandler_Multipart_PATCH(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -770,11 +801,12 @@ func TestServer_Multipart_PATCH(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8021", Handler: st}
+ hs := &http.Server{Addr: ":8021", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -813,8 +845,9 @@ func TestServer_Multipart_PATCH(t *testing.T) {
assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b))
}
-func TestServer_Error(t *testing.T) {
- st := &Handler{
+func TestHandler_Error(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -832,11 +865,12 @@ func TestServer_Error(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8177", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -847,8 +881,9 @@ func TestServer_Error(t *testing.T) {
assert.Equal(t, 500, r.StatusCode)
}
-func TestServer_Error2(t *testing.T) {
- st := &Handler{
+func TestHandler_Error2(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -866,11 +901,12 @@ func TestServer_Error2(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8177", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -881,8 +917,9 @@ func TestServer_Error2(t *testing.T) {
assert.Equal(t, 500, r.StatusCode)
}
-func TestServer_Error3(t *testing.T) {
- st := &Handler{
+func TestHandler_Error3(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1,
Uploads: &UploadsConfig{
@@ -900,11 +937,12 @@ func TestServer_Error3(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8177", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -926,8 +964,161 @@ func TestServer_Error3(t *testing.T) {
assert.Equal(t, 500, r.StatusCode)
}
+func TestHandler_ResponseDuration(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+ defer h.ft.Stop()
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8177", Handler: h}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ gotresp := make(chan interface{})
+ h.Listen(func(event int, ctx interface{}) {
+ if event == EventResponse {
+ c := ctx.(*ResponseEvent)
+
+ if c.Elapsed() > 0 {
+ close(gotresp)
+ }
+ }
+ })
+
+ body, r, err := get("http://localhost:8177/?hello=world")
+ assert.NoError(t, err)
+
+ <-gotresp
+
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", body)
+}
+
+func TestHandler_ResponseDurationDelayed(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php echoDelay pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+ defer h.ft.Stop()
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8177", Handler: h}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ gotresp := make(chan interface{})
+ h.Listen(func(event int, ctx interface{}) {
+ if event == EventResponse {
+ c := ctx.(*ResponseEvent)
+
+ if c.Elapsed() > time.Second {
+ close(gotresp)
+ }
+ }
+ })
+
+ body, r, err := get("http://localhost:8177/?hello=world")
+ assert.NoError(t, err)
+
+ <-gotresp
+
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", body)
+}
+
+func TestHandler_ErrorDuration(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
+ cfg: &Config{
+ MaxRequest: 1024,
+ Uploads: &UploadsConfig{
+ Dir: os.TempDir(),
+ Forbid: []string{},
+ },
+ },
+ rr: roadrunner.NewServer(&roadrunner.ServerConfig{
+ Command: "php ../../tests/http/client.php error pipes",
+ Relay: "pipes",
+ Pool: &roadrunner.Config{
+ NumWorkers: 1,
+ AllocateTimeout: 10000000,
+ DestroyTimeout: 10000000,
+ },
+ }),
+ }
+ defer h.ft.Stop()
+
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
+
+ hs := &http.Server{Addr: ":8177", Handler: h}
+ defer hs.Shutdown(context.Background())
+
+ go func() { hs.ListenAndServe() }()
+ time.Sleep(time.Millisecond * 10)
+
+ goterr := make(chan interface{})
+ h.Listen(func(event int, ctx interface{}) {
+ if event == EventError {
+ c := ctx.(*ErrorEvent)
+
+ if c.Elapsed() > 0 {
+ close(goterr)
+ }
+ }
+ })
+
+ _, r, err := get("http://localhost:8177/?hello=world")
+ assert.NoError(t, err)
+
+ <-goterr
+
+ assert.Equal(t, 500, r.StatusCode)
+}
+
func BenchmarkHandler_Listen_Echo(b *testing.B) {
- st := &Handler{
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -945,11 +1136,12 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
},
}),
}
+ defer h.ft.Stop()
- st.rr.Start()
- defer st.rr.Stop()
+ h.rr.Start()
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8177", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
diff --git a/service/http/service.go b/service/http/service.go
index ad59f887..eb97233d 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -7,11 +7,13 @@ import (
"github.com/spiral/roadrunner/service/env"
"github.com/spiral/roadrunner/service/http/attributes"
"github.com/spiral/roadrunner/service/rpc"
+ "github.com/spiral/roadrunner/util"
"golang.org/x/net/http2"
"net/http"
"net/url"
"strings"
"sync"
+ "time"
)
const (
@@ -77,8 +79,9 @@ func (s *Service) Serve() error {
s.rr = roadrunner.NewServer(s.cfg.Workers)
s.rr.Listen(s.throw)
- s.handler = &Handler{cfg: s.cfg, rr: s.rr}
+ s.handler = &Handler{ft: util.NewFastTime(time.Microsecond), cfg: s.cfg, rr: s.rr}
s.handler.Listen(s.throw)
+ defer s.handler.ft.Stop()
s.http = &http.Server{Addr: s.cfg.Address, Handler: s}
diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go
index 96e95733..82e7586b 100644
--- a/service/http/uploads_test.go
+++ b/service/http/uploads_test.go
@@ -7,6 +7,7 @@ import (
"encoding/hex"
"encoding/json"
"github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/util"
"github.com/stretchr/testify/assert"
"io"
"io/ioutil"
@@ -17,8 +18,9 @@ import (
"time"
)
-func TestServer_Upload_File(t *testing.T) {
- st := &Handler{
+func TestHandler_Upload_File(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -36,11 +38,12 @@ func TestServer_Upload_File(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8021", Handler: st}
+ hs := &http.Server{Addr: ":8021", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -78,8 +81,9 @@ func TestServer_Upload_File(t *testing.T) {
assert.Equal(t, `{"upload":`+fs+`}`, string(b))
}
-func TestServer_Upload_NestedFile(t *testing.T) {
- st := &Handler{
+func TestHandler_Upload_NestedFile(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -97,11 +101,12 @@ func TestServer_Upload_NestedFile(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8021", Handler: st}
+ hs := &http.Server{Addr: ":8021", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -139,8 +144,9 @@ func TestServer_Upload_NestedFile(t *testing.T) {
assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b))
}
-func TestServer_Upload_File_NoTmpDir(t *testing.T) {
- st := &Handler{
+func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -158,11 +164,12 @@ func TestServer_Upload_File_NoTmpDir(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8021", Handler: st}
+ hs := &http.Server{Addr: ":8021", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -200,8 +207,9 @@ func TestServer_Upload_File_NoTmpDir(t *testing.T) {
assert.Equal(t, `{"upload":`+fs+`}`, string(b))
}
-func TestServer_Upload_File_Forbids(t *testing.T) {
- st := &Handler{
+func TestHandler_Upload_File_Forbids(t *testing.T) {
+ h := &Handler{
+ ft: util.NewFastTime(time.Microsecond),
cfg: &Config{
MaxRequest: 1024,
Uploads: &UploadsConfig{
@@ -219,11 +227,12 @@ func TestServer_Upload_File_Forbids(t *testing.T) {
},
}),
}
+ defer h.ft.Stop()
- assert.NoError(t, st.rr.Start())
- defer st.rr.Stop()
+ assert.NoError(t, h.rr.Start())
+ defer h.rr.Stop()
- hs := &http.Server{Addr: ":8021", Handler: st}
+ hs := &http.Server{Addr: ":8021", Handler: h}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
diff --git a/service/rpc/config.go b/service/rpc/config.go
index 653da6ea..fc8cfdbb 100644
--- a/service/rpc/config.go
+++ b/service/rpc/config.go
@@ -13,7 +13,7 @@ type Config struct {
// Indicates if RPC connection is enabled.
Enable bool
- // AddListener string
+ // Listen string
Listen string
}
diff --git a/tests/http/echoDelay.php b/tests/http/echoDelay.php
new file mode 100644
index 00000000..2ee2b049
--- /dev/null
+++ b/tests/http/echoDelay.php
@@ -0,0 +1,11 @@
+<?php
+
+use \Psr\Http\Message\ServerRequestInterface;
+use \Psr\Http\Message\ResponseInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ sleep(1);
+ $resp->getBody()->write(strtoupper($req->getQueryParams()['hello']));
+ return $resp->withStatus(201);
+} \ No newline at end of file
diff --git a/tests/http/payload.php b/tests/http/payload.php
index a16984c5..52c0f819 100644
--- a/tests/http/payload.php
+++ b/tests/http/payload.php
@@ -5,6 +5,11 @@ use Psr\Http\Message\ServerRequestInterface;
function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
{
+ if ( $req->getHeaderLine("Content-Type") != 'application/json' ) {
+ $resp->getBody()->write("invalid content-type");
+ return $resp;
+ }
+
// we expect json body
$p = json_decode($req->getBody(), true);
$resp->getBody()->write(json_encode(array_flip($p)));
diff --git a/util/fasttime.go b/util/fasttime.go
index a18924cb..f1a81333 100644
--- a/util/fasttime.go
+++ b/util/fasttime.go
@@ -1,5 +1,41 @@
package util
+import (
+ "sync/atomic"
+ "time"
+)
+
// FastTime provides current unix time using specified resolution with reduced number of syscalls.
type FastTime struct {
+ last int64
+ ticker *time.Ticker
+}
+
+// NewFastTime returns new time provider with given resolution.
+func NewFastTime(resolution time.Duration) *FastTime {
+ ft := &FastTime{
+ last: time.Now().UnixNano(),
+ ticker: time.NewTicker(resolution),
+ }
+
+ go ft.run()
+
+ return ft
+}
+
+// Stop ticking.
+func (ft *FastTime) Stop() {
+ ft.ticker.Stop()
+}
+
+// UnixNano returns current timestamps. Value might be delayed after current time by specified resolution.
+func (ft *FastTime) UnixNano() int64 {
+ return atomic.LoadInt64(&ft.last)
+}
+
+// consume time values over given resolution.
+func (ft *FastTime) run() {
+ for range ft.ticker.C {
+ atomic.StoreInt64(&ft.last, time.Now().UnixNano())
+ }
}
diff --git a/util/fasttime_test.go b/util/fasttime_test.go
index c7d86821..c8ff0e13 100644
--- a/util/fasttime_test.go
+++ b/util/fasttime_test.go
@@ -1 +1,47 @@
package util
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestFTime_UnixNano(t *testing.T) {
+ ft := NewFastTime(time.Millisecond)
+ defer ft.Stop()
+
+ var d int64
+
+ d = time.Now().UnixNano() - ft.UnixNano()
+
+ assert.True(t, d >= 0)
+ assert.True(t, d <= int64(time.Millisecond*2))
+
+ time.Sleep(time.Millisecond * 100)
+ d = time.Now().UnixNano() - ft.UnixNano()
+
+ assert.True(t, d >= 0)
+ assert.True(t, d <= int64(time.Millisecond*2))
+}
+
+func Benchmark_FastTime(b *testing.B) {
+ ft := NewFastTime(time.Microsecond)
+ defer ft.Stop()
+
+ b.ReportAllocs()
+
+ for n := 0; n < b.N; n++ {
+ _ = ft.UnixNano()
+ }
+}
+
+func Benchmark_Time(b *testing.B) {
+ ft := NewFastTime(time.Microsecond)
+ defer ft.Stop()
+
+ b.ReportAllocs()
+
+ for n := 0; n < b.N; n++ {
+ _ = time.Now().UnixNano()
+ }
+}