diff options
author | Wolfy-J <[email protected]> | 2019-01-05 13:35:58 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2019-01-05 13:35:58 +0300 |
commit | 04f7ac73c477938b7390ec6ec101d0e4bcc8cd80 (patch) | |
tree | 361c8cd8a00824d0b2e5003c6a5978f630c20469 | |
parent | 46009112a783a1fdae95e0a061d4c8c41a1c8ff1 (diff) |
second set of patches
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | osutil/cmd_group.go | 13 | ||||
-rw-r--r-- | osutil/cmd_group_win.go | 13 | ||||
-rw-r--r-- | pool.go | 2 | ||||
-rw-r--r-- | server.go | 2 | ||||
-rw-r--r-- | server_config.go | 4 | ||||
-rw-r--r-- | service/env/environment.go | 2 | ||||
-rw-r--r-- | service/http/handler.go | 43 | ||||
-rw-r--r-- | service/http/handler_test.go | 380 | ||||
-rw-r--r-- | service/http/service.go | 5 | ||||
-rw-r--r-- | service/http/uploads_test.go | 49 | ||||
-rw-r--r-- | service/rpc/config.go | 2 | ||||
-rw-r--r-- | tests/http/echoDelay.php | 11 | ||||
-rw-r--r-- | tests/http/payload.php | 5 | ||||
-rw-r--r-- | util/fasttime.go | 36 | ||||
-rw-r--r-- | util/fasttime_test.go | 46 |
16 files changed, 482 insertions, 133 deletions
@@ -10,7 +10,7 @@ uninstall: rm -f /usr/local/bin/rr test: go test -v -race -cover - go test -v -race -cover ./util + go test -v -race -cover ./util go test -v -race -cover ./service go test -v -race -cover ./service/env go test -v -race -cover ./service/rpc diff --git a/osutil/cmd_group.go b/osutil/cmd_group.go new file mode 100644 index 00000000..d4b64fb6 --- /dev/null +++ b/osutil/cmd_group.go @@ -0,0 +1,13 @@ +// +build !windows + +package osutil + +import ( + "os/exec" + "syscall" +) + +// IsolateProcess change gpid for the process to avoid bypassing signals to php processes. +func IsolateProcess(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0} +} diff --git a/osutil/cmd_group_win.go b/osutil/cmd_group_win.go new file mode 100644 index 00000000..ca7fca20 --- /dev/null +++ b/osutil/cmd_group_win.go @@ -0,0 +1,13 @@ +// +build windows + +package osutil + +import ( + "os/exec" + "syscall" +) + +// IsolateProcess change gpid for the process to avoid bypassing signals to php processes. +func IsolateProcess(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP} +} @@ -22,7 +22,7 @@ const ( // Pool managed set of inner worker processes. type Pool interface { - // AddListener all caused events to attached watcher. + // Listen all caused events to attached watcher. Listen(l func(event int, ctx interface{})) // Exec one task with given payload and context, returns result or error. @@ -176,7 +176,7 @@ func (s *Server) Pool() Pool { return s.pool } -// AddListener pool events. +// Listen pool events. func (s *Server) poolListener(event int, ctx interface{}) { if event == EventPoolError { // pool failure, rebuilding diff --git a/server_config.go b/server_config.go index 199ce27b..35965962 100644 --- a/server_config.go +++ b/server_config.go @@ -3,6 +3,7 @@ package roadrunner import ( "errors" "fmt" + "github.com/spiral/roadrunner/osutil" "net" "os" "os/exec" @@ -75,8 +76,7 @@ func (cfg *ServerConfig) makeCommand() func() *exec.Cmd { var cmd = strings.Split(cfg.Command, " ") return func() *exec.Cmd { cmd := exec.Command(cmd[0], cmd[1:]...) - - cmd.SysProcAttr = &syscall.SysProcAttr{CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP} + osutil.IsolateProcess(cmd) cmd.Env = append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", cfg.Relay)) cmd.Env = append(cmd.Env, cfg.env...) diff --git a/service/env/environment.go b/service/env/environment.go index fe8f1dd8..ab8febf7 100644 --- a/service/env/environment.go +++ b/service/env/environment.go @@ -16,7 +16,7 @@ type Setter interface { SetEnv(key, value string) } -// Setter provides ability to set environment value. +// Getter provides ability to set environment value. type Getter interface { // GetEnv must return list of env variables. GetEnv() (map[string]string, error) diff --git a/service/http/handler.go b/service/http/handler.go index d7521959..8ff1bdeb 100644 --- a/service/http/handler.go +++ b/service/http/handler.go @@ -3,9 +3,11 @@ package http import ( "github.com/pkg/errors" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/util" "net/http" "strconv" "sync" + "time" ) const ( @@ -23,6 +25,14 @@ type ErrorEvent struct { // Error - associated error, if any. Error error + + // event timings + start, end int64 +} + +// Elapsed returns duration of the invocation. +func (e *ErrorEvent) Elapsed() time.Duration { + return time.Duration(e.end - e.start) } // ResponseEvent represents singular http response event. @@ -32,11 +42,20 @@ type ResponseEvent struct { // Response contains service response. Response *Response + + // event timings + start, end int64 +} + +// Elapsed returns duration of the invocation. +func (e *ResponseEvent) Elapsed() time.Duration { + return time.Duration(e.end - e.start) } // Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, // parsed files and query, payload will include parsed form dataTree (if any). type Handler struct { + ft *util.FastTime cfg *Config rr *roadrunner.Server mul sync.Mutex @@ -53,14 +72,16 @@ func (h *Handler) Listen(l func(event int, ctx interface{})) { // mdwr serve using PSR-7 requests passed to underlying application. Attempts to serve static files first if enabled. func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + start := h.ft.UnixNano() + // validating request size if h.cfg.MaxRequest != 0 { if length := r.Header.Get("content-length"); length != "" { if size, err := strconv.ParseInt(length, 10, 64); err != nil { - h.handleError(w, r, err) + h.handleError(w, r, err, start) return } else if size > h.cfg.MaxRequest*1024*1024 { - h.handleError(w, r, errors.New("request body max size is exceeded")) + h.handleError(w, r, errors.New("request body max size is exceeded"), start) return } } @@ -68,7 +89,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { req, err := NewRequest(r, h.cfg.Uploads) if err != nil { - h.handleError(w, r, err) + h.handleError(w, r, err, start) return } @@ -77,37 +98,37 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { p, err := req.Payload() if err != nil { - h.handleError(w, r, err) + h.handleError(w, r, err, start) return } rsp, err := h.rr.Exec(p) if err != nil { - h.handleError(w, r, err) + h.handleError(w, r, err, start) return } resp, err := NewResponse(rsp) if err != nil { - h.handleError(w, r, err) + h.handleError(w, r, err, start) return } - h.handleResponse(req, resp) + h.handleResponse(req, resp, start) resp.Write(w) } // handleError sends error. -func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error) { - h.throw(EventError, &ErrorEvent{Request: r, Error: err}) +func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error, start int64) { + h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, end: h.ft.UnixNano()}) w.WriteHeader(500) w.Write([]byte(err.Error())) } // handleResponse triggers response event. -func (h *Handler) handleResponse(req *Request, resp *Response) { - h.throw(EventResponse, &ResponseEvent{Request: req, Response: resp}) +func (h *Handler) handleResponse(req *Request, resp *Response, start int64) { + h.throw(EventResponse, &ResponseEvent{Request: req, Response: resp, start: start, end: h.ft.UnixNano()}) } // throw invokes event handler if any. diff --git a/service/http/handler_test.go b/service/http/handler_test.go index 1750bf43..d8a15202 100644 --- a/service/http/handler_test.go +++ b/service/http/handler_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/util" "github.com/stretchr/testify/assert" "io/ioutil" "mime/multipart" @@ -29,8 +30,9 @@ func get(url string) (string, *http.Response, error) { return string(b), r, err } -func TestServer_Echo(t *testing.T) { - st := &Handler{ +func TestHandler_Echo(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -48,11 +50,12 @@ func TestServer_Echo(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8177", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -65,7 +68,8 @@ func TestServer_Echo(t *testing.T) { } func Test_HandlerErrors(t *testing.T) { - st := &Handler{ + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -83,16 +87,18 @@ func Test_HandlerErrors(t *testing.T) { }, }), } + defer h.ft.Stop() wr := httptest.NewRecorder() rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("data"))) - st.ServeHTTP(wr, rq) + h.ServeHTTP(wr, rq) assert.Equal(t, 500, wr.Code) } func Test_Handler_JSON_error(t *testing.T) { - st := &Handler{ + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -110,18 +116,20 @@ func Test_Handler_JSON_error(t *testing.T) { }, }), } + defer h.ft.Stop() wr := httptest.NewRecorder() rq := httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("{sd"))) rq.Header.Add("Content-Type", "application/json") rq.Header.Add("Content-Size", "3") - st.ServeHTTP(wr, rq) + h.ServeHTTP(wr, rq) assert.Equal(t, 500, wr.Code) } -func TestServer_Headers(t *testing.T) { - st := &Handler{ +func TestHandler_Headers(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -139,11 +147,12 @@ func TestServer_Headers(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8078", Handler: st} + hs := &http.Server{Addr: ":8078", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -167,8 +176,9 @@ func TestServer_Headers(t *testing.T) { assert.Equal(t, "SAMPLE", string(b)) } -func TestServer_Empty_User_Agent(t *testing.T) { - st := &Handler{ +func TestHandler_Empty_User_Agent(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -186,11 +196,12 @@ func TestServer_Empty_User_Agent(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8088", Handler: st} + hs := &http.Server{Addr: ":8088", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -213,9 +224,9 @@ func TestServer_Empty_User_Agent(t *testing.T) { assert.Equal(t, "", string(b)) } - -func TestServer_User_Agent(t *testing.T) { - st := &Handler{ +func TestHandler_User_Agent(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -233,11 +244,12 @@ func TestServer_User_Agent(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8088", Handler: st} + hs := &http.Server{Addr: ":8088", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -260,8 +272,9 @@ func TestServer_User_Agent(t *testing.T) { assert.Equal(t, "go-agent", string(b)) } -func TestServer_Cookies(t *testing.T) { - st := &Handler{ +func TestHandler_Cookies(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -279,11 +292,12 @@ func TestServer_Cookies(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8079", Handler: st} + hs := &http.Server{Addr: ":8079", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -311,8 +325,9 @@ func TestServer_Cookies(t *testing.T) { } } -func TestServer_JsonPayload_POST(t *testing.T) { - st := &Handler{ +func TestHandler_JsonPayload_POST(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -330,11 +345,12 @@ func TestServer_JsonPayload_POST(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8090", Handler: st} + hs := &http.Server{Addr: ":8090", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -361,8 +377,9 @@ func TestServer_JsonPayload_POST(t *testing.T) { assert.Equal(t, `{"value":"key"}`, string(b)) } -func TestServer_JsonPayload_PUT(t *testing.T) { - st := &Handler{ +func TestHandler_JsonPayload_PUT(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -380,11 +397,12 @@ func TestServer_JsonPayload_PUT(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8081", Handler: st} + hs := &http.Server{Addr: ":8081", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -407,8 +425,9 @@ func TestServer_JsonPayload_PUT(t *testing.T) { assert.Equal(t, `{"value":"key"}`, string(b)) } -func TestServer_JsonPayload_PATCH(t *testing.T) { - st := &Handler{ +func TestHandler_JsonPayload_PATCH(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -426,11 +445,12 @@ func TestServer_JsonPayload_PATCH(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8082", Handler: st} + hs := &http.Server{Addr: ":8082", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -453,8 +473,9 @@ func TestServer_JsonPayload_PATCH(t *testing.T) { assert.Equal(t, `{"value":"key"}`, string(b)) } -func TestServer_FormData_POST(t *testing.T) { - st := &Handler{ +func TestHandler_FormData_POST(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -472,11 +493,12 @@ func TestServer_FormData_POST(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8083", Handler: st} + hs := &http.Server{Addr: ":8083", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -511,8 +533,9 @@ func TestServer_FormData_POST(t *testing.T) { assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) } -func TestServer_FormData_PUT(t *testing.T) { - st := &Handler{ +func TestHandler_FormData_PUT(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -530,11 +553,12 @@ func TestServer_FormData_PUT(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8084", Handler: st} + hs := &http.Server{Addr: ":8084", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -569,8 +593,9 @@ func TestServer_FormData_PUT(t *testing.T) { assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) } -func TestServer_FormData_PATCH(t *testing.T) { - st := &Handler{ +func TestHandler_FormData_PATCH(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -588,11 +613,12 @@ func TestServer_FormData_PATCH(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8085", Handler: st} + hs := &http.Server{Addr: ":8085", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -627,8 +653,9 @@ func TestServer_FormData_PATCH(t *testing.T) { assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) } -func TestServer_Multipart_POST(t *testing.T) { - st := &Handler{ +func TestHandler_Multipart_POST(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -646,11 +673,12 @@ func TestServer_Multipart_POST(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8019", Handler: st} + hs := &http.Server{Addr: ":8019", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -689,8 +717,9 @@ func TestServer_Multipart_POST(t *testing.T) { assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) } -func TestServer_Multipart_PUT(t *testing.T) { - st := &Handler{ +func TestHandler_Multipart_PUT(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -708,11 +737,12 @@ func TestServer_Multipart_PUT(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8020", Handler: st} + hs := &http.Server{Addr: ":8020", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -751,8 +781,9 @@ func TestServer_Multipart_PUT(t *testing.T) { assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) } -func TestServer_Multipart_PATCH(t *testing.T) { - st := &Handler{ +func TestHandler_Multipart_PATCH(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -770,11 +801,12 @@ func TestServer_Multipart_PATCH(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8021", Handler: st} + hs := &http.Server{Addr: ":8021", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -813,8 +845,9 @@ func TestServer_Multipart_PATCH(t *testing.T) { assert.Equal(t, `{"arr":{"c":{"p":"l","z":""},"x":{"y":{"e":"f","z":"y"}}},"key":"value","name":["name1","name2","name3"]}`, string(b)) } -func TestServer_Error(t *testing.T) { - st := &Handler{ +func TestHandler_Error(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -832,11 +865,12 @@ func TestServer_Error(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8177", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -847,8 +881,9 @@ func TestServer_Error(t *testing.T) { assert.Equal(t, 500, r.StatusCode) } -func TestServer_Error2(t *testing.T) { - st := &Handler{ +func TestHandler_Error2(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -866,11 +901,12 @@ func TestServer_Error2(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8177", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -881,8 +917,9 @@ func TestServer_Error2(t *testing.T) { assert.Equal(t, 500, r.StatusCode) } -func TestServer_Error3(t *testing.T) { - st := &Handler{ +func TestHandler_Error3(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1, Uploads: &UploadsConfig{ @@ -900,11 +937,12 @@ func TestServer_Error3(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8177", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -926,8 +964,161 @@ func TestServer_Error3(t *testing.T) { assert.Equal(t, 500, r.StatusCode) } +func TestHandler_ResponseDuration(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php echo pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + defer h.ft.Stop() + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8177", Handler: h} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + gotresp := make(chan interface{}) + h.Listen(func(event int, ctx interface{}) { + if event == EventResponse { + c := ctx.(*ResponseEvent) + + if c.Elapsed() > 0 { + close(gotresp) + } + } + }) + + body, r, err := get("http://localhost:8177/?hello=world") + assert.NoError(t, err) + + <-gotresp + + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", body) +} + +func TestHandler_ResponseDurationDelayed(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php echoDelay pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + defer h.ft.Stop() + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8177", Handler: h} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + gotresp := make(chan interface{}) + h.Listen(func(event int, ctx interface{}) { + if event == EventResponse { + c := ctx.(*ResponseEvent) + + if c.Elapsed() > time.Second { + close(gotresp) + } + } + }) + + body, r, err := get("http://localhost:8177/?hello=world") + assert.NoError(t, err) + + <-gotresp + + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", body) +} + +func TestHandler_ErrorDuration(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), + cfg: &Config{ + MaxRequest: 1024, + Uploads: &UploadsConfig{ + Dir: os.TempDir(), + Forbid: []string{}, + }, + }, + rr: roadrunner.NewServer(&roadrunner.ServerConfig{ + Command: "php ../../tests/http/client.php error pipes", + Relay: "pipes", + Pool: &roadrunner.Config{ + NumWorkers: 1, + AllocateTimeout: 10000000, + DestroyTimeout: 10000000, + }, + }), + } + defer h.ft.Stop() + + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() + + hs := &http.Server{Addr: ":8177", Handler: h} + defer hs.Shutdown(context.Background()) + + go func() { hs.ListenAndServe() }() + time.Sleep(time.Millisecond * 10) + + goterr := make(chan interface{}) + h.Listen(func(event int, ctx interface{}) { + if event == EventError { + c := ctx.(*ErrorEvent) + + if c.Elapsed() > 0 { + close(goterr) + } + } + }) + + _, r, err := get("http://localhost:8177/?hello=world") + assert.NoError(t, err) + + <-goterr + + assert.Equal(t, 500, r.StatusCode) +} + func BenchmarkHandler_Listen_Echo(b *testing.B) { - st := &Handler{ + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -945,11 +1136,12 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { }, }), } + defer h.ft.Stop() - st.rr.Start() - defer st.rr.Stop() + h.rr.Start() + defer h.rr.Stop() - hs := &http.Server{Addr: ":8177", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() diff --git a/service/http/service.go b/service/http/service.go index ad59f887..eb97233d 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -7,11 +7,13 @@ import ( "github.com/spiral/roadrunner/service/env" "github.com/spiral/roadrunner/service/http/attributes" "github.com/spiral/roadrunner/service/rpc" + "github.com/spiral/roadrunner/util" "golang.org/x/net/http2" "net/http" "net/url" "strings" "sync" + "time" ) const ( @@ -77,8 +79,9 @@ func (s *Service) Serve() error { s.rr = roadrunner.NewServer(s.cfg.Workers) s.rr.Listen(s.throw) - s.handler = &Handler{cfg: s.cfg, rr: s.rr} + s.handler = &Handler{ft: util.NewFastTime(time.Microsecond), cfg: s.cfg, rr: s.rr} s.handler.Listen(s.throw) + defer s.handler.ft.Stop() s.http = &http.Server{Addr: s.cfg.Address, Handler: s} diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go index 96e95733..82e7586b 100644 --- a/service/http/uploads_test.go +++ b/service/http/uploads_test.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "encoding/json" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/util" "github.com/stretchr/testify/assert" "io" "io/ioutil" @@ -17,8 +18,9 @@ import ( "time" ) -func TestServer_Upload_File(t *testing.T) { - st := &Handler{ +func TestHandler_Upload_File(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -36,11 +38,12 @@ func TestServer_Upload_File(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8021", Handler: st} + hs := &http.Server{Addr: ":8021", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -78,8 +81,9 @@ func TestServer_Upload_File(t *testing.T) { assert.Equal(t, `{"upload":`+fs+`}`, string(b)) } -func TestServer_Upload_NestedFile(t *testing.T) { - st := &Handler{ +func TestHandler_Upload_NestedFile(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -97,11 +101,12 @@ func TestServer_Upload_NestedFile(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8021", Handler: st} + hs := &http.Server{Addr: ":8021", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -139,8 +144,9 @@ func TestServer_Upload_NestedFile(t *testing.T) { assert.Equal(t, `{"upload":{"x":{"y":{"z":[`+fs+`]}}}}`, string(b)) } -func TestServer_Upload_File_NoTmpDir(t *testing.T) { - st := &Handler{ +func TestHandler_Upload_File_NoTmpDir(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -158,11 +164,12 @@ func TestServer_Upload_File_NoTmpDir(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8021", Handler: st} + hs := &http.Server{Addr: ":8021", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -200,8 +207,9 @@ func TestServer_Upload_File_NoTmpDir(t *testing.T) { assert.Equal(t, `{"upload":`+fs+`}`, string(b)) } -func TestServer_Upload_File_Forbids(t *testing.T) { - st := &Handler{ +func TestHandler_Upload_File_Forbids(t *testing.T) { + h := &Handler{ + ft: util.NewFastTime(time.Microsecond), cfg: &Config{ MaxRequest: 1024, Uploads: &UploadsConfig{ @@ -219,11 +227,12 @@ func TestServer_Upload_File_Forbids(t *testing.T) { }, }), } + defer h.ft.Stop() - assert.NoError(t, st.rr.Start()) - defer st.rr.Stop() + assert.NoError(t, h.rr.Start()) + defer h.rr.Stop() - hs := &http.Server{Addr: ":8021", Handler: st} + hs := &http.Server{Addr: ":8021", Handler: h} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() diff --git a/service/rpc/config.go b/service/rpc/config.go index 653da6ea..fc8cfdbb 100644 --- a/service/rpc/config.go +++ b/service/rpc/config.go @@ -13,7 +13,7 @@ type Config struct { // Indicates if RPC connection is enabled. Enable bool - // AddListener string + // Listen string Listen string } diff --git a/tests/http/echoDelay.php b/tests/http/echoDelay.php new file mode 100644 index 00000000..2ee2b049 --- /dev/null +++ b/tests/http/echoDelay.php @@ -0,0 +1,11 @@ +<?php + +use \Psr\Http\Message\ServerRequestInterface; +use \Psr\Http\Message\ResponseInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + sleep(1); + $resp->getBody()->write(strtoupper($req->getQueryParams()['hello'])); + return $resp->withStatus(201); +}
\ No newline at end of file diff --git a/tests/http/payload.php b/tests/http/payload.php index a16984c5..52c0f819 100644 --- a/tests/http/payload.php +++ b/tests/http/payload.php @@ -5,6 +5,11 @@ use Psr\Http\Message\ServerRequestInterface; function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface { + if ( $req->getHeaderLine("Content-Type") != 'application/json' ) { + $resp->getBody()->write("invalid content-type"); + return $resp; + } + // we expect json body $p = json_decode($req->getBody(), true); $resp->getBody()->write(json_encode(array_flip($p))); diff --git a/util/fasttime.go b/util/fasttime.go index a18924cb..f1a81333 100644 --- a/util/fasttime.go +++ b/util/fasttime.go @@ -1,5 +1,41 @@ package util +import ( + "sync/atomic" + "time" +) + // FastTime provides current unix time using specified resolution with reduced number of syscalls. type FastTime struct { + last int64 + ticker *time.Ticker +} + +// NewFastTime returns new time provider with given resolution. +func NewFastTime(resolution time.Duration) *FastTime { + ft := &FastTime{ + last: time.Now().UnixNano(), + ticker: time.NewTicker(resolution), + } + + go ft.run() + + return ft +} + +// Stop ticking. +func (ft *FastTime) Stop() { + ft.ticker.Stop() +} + +// UnixNano returns current timestamps. Value might be delayed after current time by specified resolution. +func (ft *FastTime) UnixNano() int64 { + return atomic.LoadInt64(&ft.last) +} + +// consume time values over given resolution. +func (ft *FastTime) run() { + for range ft.ticker.C { + atomic.StoreInt64(&ft.last, time.Now().UnixNano()) + } } diff --git a/util/fasttime_test.go b/util/fasttime_test.go index c7d86821..c8ff0e13 100644 --- a/util/fasttime_test.go +++ b/util/fasttime_test.go @@ -1 +1,47 @@ package util + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestFTime_UnixNano(t *testing.T) { + ft := NewFastTime(time.Millisecond) + defer ft.Stop() + + var d int64 + + d = time.Now().UnixNano() - ft.UnixNano() + + assert.True(t, d >= 0) + assert.True(t, d <= int64(time.Millisecond*2)) + + time.Sleep(time.Millisecond * 100) + d = time.Now().UnixNano() - ft.UnixNano() + + assert.True(t, d >= 0) + assert.True(t, d <= int64(time.Millisecond*2)) +} + +func Benchmark_FastTime(b *testing.B) { + ft := NewFastTime(time.Microsecond) + defer ft.Stop() + + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + _ = ft.UnixNano() + } +} + +func Benchmark_Time(b *testing.B) { + ft := NewFastTime(time.Microsecond) + defer ft.Stop() + + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + _ = time.Now().UnixNano() + } +} |