diff options
Diffstat (limited to 'service/http')
-rw-r--r-- | service/http/handler_test.go | 18 | ||||
-rw-r--r-- | service/http/rpc_test.go | 66 | ||||
-rw-r--r-- | service/http/service.go | 23 | ||||
-rw-r--r-- | service/http/service_test.go | 60 |
4 files changed, 150 insertions, 17 deletions
diff --git a/service/http/handler_test.go b/service/http/handler_test.go index 4a11c562..59a4c7c0 100644 --- a/service/http/handler_test.go +++ b/service/http/handler_test.go @@ -51,13 +51,13 @@ func TestServer_Echo(t *testing.T) { assert.NoError(t, st.rr.Start()) defer st.rr.Stop() - hs := &http.Server{Addr: ":8077", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: st} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() time.Sleep(time.Millisecond * 10) - body, r, err := get("http://localhost:8077/?hello=world") + body, r, err := get("http://localhost:8177/?hello=world") assert.NoError(t, err) assert.Equal(t, 201, r.StatusCode) assert.Equal(t, "WORLD", body) @@ -686,13 +686,13 @@ func TestServer_Error(t *testing.T) { assert.NoError(t, st.rr.Start()) defer st.rr.Stop() - hs := &http.Server{Addr: ":8077", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: st} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() time.Sleep(time.Millisecond * 10) - _, r, err := get("http://localhost:8077/?hello=world") + _, r, err := get("http://localhost:8177/?hello=world") assert.NoError(t, err) assert.Equal(t, 500, r.StatusCode) } @@ -720,13 +720,13 @@ func TestServer_Error2(t *testing.T) { assert.NoError(t, st.rr.Start()) defer st.rr.Stop() - hs := &http.Server{Addr: ":8077", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: st} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() time.Sleep(time.Millisecond * 10) - _, r, err := get("http://localhost:8077/?hello=world") + _, r, err := get("http://localhost:8177/?hello=world") assert.NoError(t, err) assert.Equal(t, 500, r.StatusCode) } @@ -754,7 +754,7 @@ func TestServer_Error3(t *testing.T) { assert.NoError(t, st.rr.Start()) defer st.rr.Stop() - hs := &http.Server{Addr: ":8077", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: st} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -799,7 +799,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { st.rr.Start() defer st.rr.Stop() - hs := &http.Server{Addr: ":8077", Handler: st} + hs := &http.Server{Addr: ":8177", Handler: st} defer hs.Shutdown(context.Background()) go func() { hs.ListenAndServe() }() @@ -807,7 +807,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { bb := "WORLD" for n := 0; n < b.N; n++ { - r, err := http.Get("http://localhost:8077/?hello=world") + r, err := http.Get("http://localhost:8177/?hello=world") if err != nil { b.Fail() } diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go index fc47a70f..c392b060 100644 --- a/service/http/rpc_test.go +++ b/service/http/rpc_test.go @@ -1,11 +1,14 @@ package http import ( + "encoding/json" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/spiral/roadrunner/service" "github.com/spiral/roadrunner/service/rpc" "github.com/stretchr/testify/assert" + "os" + "runtime" "strconv" "testing" "time" @@ -65,6 +68,67 @@ func Test_RPC(t *testing.T) { assert.NotEqual(t, res, res2) } +func Test_RPC_Unix(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on " + runtime.GOOS) + } + + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rpc.ID, &rpc.Service{}) + c.Register(ID, &Service{}) + + sock := `unix://` + os.TempDir() + `/rpc.unix` + j, _ := json.Marshal(sock) + + assert.NoError(t, c.Init(&testCfg{ + rpcCfg: `{"enable":true, "listen":` + string(j) + `}`, + httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php pid pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, _ := c.Get(ID) + ss := s.(*Service) + + s2, _ := c.Get(rpc.ID) + rs := s2.(*rpc.Service) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + res, _, _ := get("http://localhost:6029") + assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res) + + cl, err := rs.Client() + assert.NoError(t, err) + + r := "" + assert.NoError(t, cl.Call("http.Reset", true, &r)) + assert.Equal(t, "OK", r) + + res2, _, _ := get("http://localhost:6029") + assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2) + assert.NotEqual(t, res, res2) +} + func Test_Workers(t *testing.T) { logger, _ := test.NewNullLogger() logger.SetLevel(logrus.DebugLevel) @@ -74,7 +138,7 @@ func Test_Workers(t *testing.T) { c.Register(ID, &Service{}) assert.NoError(t, c.Init(&testCfg{ - rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`, + rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`, httpCfg: `{ "enable": true, "address": ":6029", diff --git a/service/http/service.go b/service/http/service.go index 3d200845..cef019b3 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -7,6 +7,7 @@ import ( "github.com/spiral/roadrunner/service/rpc" "net/http" "sync" + "sync/atomic" ) // ID contains default svc name. @@ -21,10 +22,11 @@ type Service struct { lsns []func(event int, ctx interface{}) mdws []middleware - mu sync.Mutex - rr *roadrunner.Server - srv *Handler - http *http.Server + mu sync.Mutex + rr *roadrunner.Server + stopping int32 + srv *Handler + http *http.Server } // AddMiddleware adds new net/http middleware. @@ -95,6 +97,11 @@ func (s *Service) Serve() error { // Stop stops the svc. func (s *Service) Stop() { + if atomic.LoadInt32(&s.stopping) != 0 { + // already stopping + return + } + s.mu.Lock() defer s.mu.Unlock() if s.http == nil { @@ -121,9 +128,11 @@ func (s *Service) listener(event int, ctx interface{}) { } if event == roadrunner.EventServerFailure { - // attempting rr server restart - if err := s.rr.Start(); err != nil { - s.Stop() + if atomic.LoadInt32(&s.stopping) != 0 { + // attempting rr server restart + if err := s.rr.Start(); err != nil { + s.Stop() + } } } } diff --git a/service/http/service_test.go b/service/http/service_test.go index 55fa660b..02d1c3f0 100644 --- a/service/http/service_test.go +++ b/service/http/service_test.go @@ -163,6 +163,66 @@ func Test_Service_Echo(t *testing.T) { assert.Equal(t, "WORLD", string(b)) } +func Test_Service_ErrorEcho(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echoerr pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusConfigured, st) + + goterr := make(chan interface{}) + s.(*Service).AddListener(func(event int, ctx interface{}) { + if event == roadrunner.EventStderrOutput { + if string(ctx.([]byte)) == "WORLD\n" { + goterr <- nil + } + } + }) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + req, err := http.NewRequest("GET", "http://localhost:6029?hello=world", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + <-goterr + + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) +} + func Test_Service_Middleware(t *testing.T) { logger, _ := test.NewNullLogger() logger.SetLevel(logrus.DebugLevel) |