summaryrefslogtreecommitdiff
path: root/service/http
diff options
context:
space:
mode:
Diffstat (limited to 'service/http')
-rw-r--r--service/http/handler_test.go18
-rw-r--r--service/http/rpc_test.go66
-rw-r--r--service/http/service.go23
-rw-r--r--service/http/service_test.go60
4 files changed, 150 insertions, 17 deletions
diff --git a/service/http/handler_test.go b/service/http/handler_test.go
index 4a11c562..59a4c7c0 100644
--- a/service/http/handler_test.go
+++ b/service/http/handler_test.go
@@ -51,13 +51,13 @@ func TestServer_Echo(t *testing.T) {
assert.NoError(t, st.rr.Start())
defer st.rr.Stop()
- hs := &http.Server{Addr: ":8077", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: st}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
time.Sleep(time.Millisecond * 10)
- body, r, err := get("http://localhost:8077/?hello=world")
+ body, r, err := get("http://localhost:8177/?hello=world")
assert.NoError(t, err)
assert.Equal(t, 201, r.StatusCode)
assert.Equal(t, "WORLD", body)
@@ -686,13 +686,13 @@ func TestServer_Error(t *testing.T) {
assert.NoError(t, st.rr.Start())
defer st.rr.Stop()
- hs := &http.Server{Addr: ":8077", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: st}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
time.Sleep(time.Millisecond * 10)
- _, r, err := get("http://localhost:8077/?hello=world")
+ _, r, err := get("http://localhost:8177/?hello=world")
assert.NoError(t, err)
assert.Equal(t, 500, r.StatusCode)
}
@@ -720,13 +720,13 @@ func TestServer_Error2(t *testing.T) {
assert.NoError(t, st.rr.Start())
defer st.rr.Stop()
- hs := &http.Server{Addr: ":8077", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: st}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
time.Sleep(time.Millisecond * 10)
- _, r, err := get("http://localhost:8077/?hello=world")
+ _, r, err := get("http://localhost:8177/?hello=world")
assert.NoError(t, err)
assert.Equal(t, 500, r.StatusCode)
}
@@ -754,7 +754,7 @@ func TestServer_Error3(t *testing.T) {
assert.NoError(t, st.rr.Start())
defer st.rr.Stop()
- hs := &http.Server{Addr: ":8077", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: st}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -799,7 +799,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
st.rr.Start()
defer st.rr.Stop()
- hs := &http.Server{Addr: ":8077", Handler: st}
+ hs := &http.Server{Addr: ":8177", Handler: st}
defer hs.Shutdown(context.Background())
go func() { hs.ListenAndServe() }()
@@ -807,7 +807,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
bb := "WORLD"
for n := 0; n < b.N; n++ {
- r, err := http.Get("http://localhost:8077/?hello=world")
+ r, err := http.Get("http://localhost:8177/?hello=world")
if err != nil {
b.Fail()
}
diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go
index fc47a70f..c392b060 100644
--- a/service/http/rpc_test.go
+++ b/service/http/rpc_test.go
@@ -1,11 +1,14 @@
package http
import (
+ "encoding/json"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/spiral/roadrunner/service"
"github.com/spiral/roadrunner/service/rpc"
"github.com/stretchr/testify/assert"
+ "os"
+ "runtime"
"strconv"
"testing"
"time"
@@ -65,6 +68,67 @@ func Test_RPC(t *testing.T) {
assert.NotEqual(t, res, res2)
}
+func Test_RPC_Unix(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on " + runtime.GOOS)
+ }
+
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rpc.ID, &rpc.Service{})
+ c.Register(ID, &Service{})
+
+ sock := `unix://` + os.TempDir() + `/rpc.unix`
+ j, _ := json.Marshal(sock)
+
+ assert.NoError(t, c.Init(&testCfg{
+ rpcCfg: `{"enable":true, "listen":` + string(j) + `}`,
+ httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php pid pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, _ := c.Get(ID)
+ ss := s.(*Service)
+
+ s2, _ := c.Get(rpc.ID)
+ rs := s2.(*rpc.Service)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ res, _, _ := get("http://localhost:6029")
+ assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res)
+
+ cl, err := rs.Client()
+ assert.NoError(t, err)
+
+ r := ""
+ assert.NoError(t, cl.Call("http.Reset", true, &r))
+ assert.Equal(t, "OK", r)
+
+ res2, _, _ := get("http://localhost:6029")
+ assert.Equal(t, strconv.Itoa(*ss.rr.Workers()[0].Pid), res2)
+ assert.NotEqual(t, res, res2)
+}
+
func Test_Workers(t *testing.T) {
logger, _ := test.NewNullLogger()
logger.SetLevel(logrus.DebugLevel)
@@ -74,7 +138,7 @@ func Test_Workers(t *testing.T) {
c.Register(ID, &Service{})
assert.NoError(t, c.Init(&testCfg{
- rpcCfg: `{"enable":true, "listen":"tcp://:5004"}`,
+ rpcCfg: `{"enable":true, "listen":"tcp://:5005"}`,
httpCfg: `{
"enable": true,
"address": ":6029",
diff --git a/service/http/service.go b/service/http/service.go
index 3d200845..cef019b3 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -7,6 +7,7 @@ import (
"github.com/spiral/roadrunner/service/rpc"
"net/http"
"sync"
+ "sync/atomic"
)
// ID contains default svc name.
@@ -21,10 +22,11 @@ type Service struct {
lsns []func(event int, ctx interface{})
mdws []middleware
- mu sync.Mutex
- rr *roadrunner.Server
- srv *Handler
- http *http.Server
+ mu sync.Mutex
+ rr *roadrunner.Server
+ stopping int32
+ srv *Handler
+ http *http.Server
}
// AddMiddleware adds new net/http middleware.
@@ -95,6 +97,11 @@ func (s *Service) Serve() error {
// Stop stops the svc.
func (s *Service) Stop() {
+ if atomic.LoadInt32(&s.stopping) != 0 {
+ // already stopping
+ return
+ }
+
s.mu.Lock()
defer s.mu.Unlock()
if s.http == nil {
@@ -121,9 +128,11 @@ func (s *Service) listener(event int, ctx interface{}) {
}
if event == roadrunner.EventServerFailure {
- // attempting rr server restart
- if err := s.rr.Start(); err != nil {
- s.Stop()
+ if atomic.LoadInt32(&s.stopping) != 0 {
+ // attempting rr server restart
+ if err := s.rr.Start(); err != nil {
+ s.Stop()
+ }
}
}
}
diff --git a/service/http/service_test.go b/service/http/service_test.go
index 55fa660b..02d1c3f0 100644
--- a/service/http/service_test.go
+++ b/service/http/service_test.go
@@ -163,6 +163,66 @@ func Test_Service_Echo(t *testing.T) {
assert.Equal(t, "WORLD", string(b))
}
+func Test_Service_ErrorEcho(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echoerr pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusConfigured, st)
+
+ goterr := make(chan interface{})
+ s.(*Service).AddListener(func(event int, ctx interface{}) {
+ if event == roadrunner.EventStderrOutput {
+ if string(ctx.([]byte)) == "WORLD\n" {
+ goterr <- nil
+ }
+ }
+ })
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ req, err := http.NewRequest("GET", "http://localhost:6029?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ <-goterr
+
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+}
+
func Test_Service_Middleware(t *testing.T) {
logger, _ := test.NewNullLogger()
logger.SetLevel(logrus.DebugLevel)