summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/rr/cmd/serve.go2
-rw-r--r--error_buffer_test.go91
-rw-r--r--pipe_factory.go14
-rw-r--r--pipe_factory_test.go46
-rw-r--r--protocol.go3
-rw-r--r--server_config.go5
-rw-r--r--server_config_test.go46
-rw-r--r--server_test.go11
-rw-r--r--service/container.go5
-rw-r--r--service/container_test.go10
-rw-r--r--service/env/config_test.go5
-rw-r--r--service/env/service_test.go18
-rw-r--r--service/headers/service_test.go56
-rw-r--r--service/health/service.go21
-rw-r--r--service/health/service_test.go54
-rw-r--r--service/http/attributes/attributes_test.go20
-rw-r--r--service/http/config.go15
-rw-r--r--service/http/h2c_test.go7
-rw-r--r--service/http/handler.go16
-rw-r--r--service/http/handler_test.go744
-rw-r--r--service/http/request.go9
-rw-r--r--service/http/response.go10
-rw-r--r--service/http/rpc_test.go21
-rw-r--r--service/http/service.go52
-rw-r--r--service/http/service_test.go77
-rw-r--r--service/http/ssl_test.go56
-rw-r--r--service/http/uploads.go43
-rw-r--r--service/http/uploads_test.go196
-rw-r--r--service/limit/config_test.go5
-rw-r--r--service/limit/controller.go7
-rw-r--r--service/limit/service_test.go66
-rw-r--r--service/metrics/rpc.go30
-rw-r--r--service/metrics/rpc_test.go7
-rw-r--r--service/metrics/service.go21
-rw-r--r--service/metrics/service_test.go37
-rw-r--r--service/rpc/config_test.go47
-rw-r--r--service/static/service_test.go70
-rw-r--r--socket_factory.go7
-rw-r--r--socket_factory_test.go207
-rw-r--r--static_pool.go12
-rw-r--r--static_pool_test.go48
-rw-r--r--util/network.go10
-rw-r--r--worker.go14
-rw-r--r--worker_test.go76
44 files changed, 1909 insertions, 408 deletions
diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go
index c754be46..1e8f53b2 100644
--- a/cmd/rr/cmd/serve.go
+++ b/cmd/rr/cmd/serve.go
@@ -36,7 +36,7 @@ func init() {
RunE: serveHandler,
})
- signal.Notify(stopSignal, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT)
+ signal.Notify(stopSignal, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
}
func serveHandler(cmd *cobra.Command, args []string) error {
diff --git a/error_buffer_test.go b/error_buffer_test.go
index 81107935..c163ea43 100644
--- a/error_buffer_test.go
+++ b/error_buffer_test.go
@@ -7,16 +7,29 @@ import (
func TestErrBuffer_Write_Len(t *testing.T) {
buf := newErrBuffer()
- defer buf.Close()
-
- buf.Write([]byte("hello"))
+ defer func() {
+ err := buf.Close()
+ if err != nil {
+ t.Errorf("error during closing the buffer: error %v", err)
+ }
+ }()
+
+ _, err := buf.Write([]byte("hello"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
assert.Equal(t, 5, buf.Len())
assert.Equal(t, "hello", buf.String())
}
func TestErrBuffer_Write_Event(t *testing.T) {
buf := newErrBuffer()
- defer buf.Close()
+ defer func() {
+ err := buf.Close()
+ if err != nil {
+ t.Errorf("error during closing the buffer: error %v", err)
+ }
+ }()
tr := make(chan interface{})
buf.Listen(func(event int, ctx interface{}) {
@@ -25,8 +38,10 @@ func TestErrBuffer_Write_Event(t *testing.T) {
close(tr)
})
- buf.Write([]byte("hello\n"))
-
+ _, err := buf.Write([]byte("hello\n"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
<-tr
// messages are read
@@ -35,7 +50,12 @@ func TestErrBuffer_Write_Event(t *testing.T) {
func TestErrBuffer_Write_Event_Separated(t *testing.T) {
buf := newErrBuffer()
- defer buf.Close()
+ defer func() {
+ err := buf.Close()
+ if err != nil {
+ t.Errorf("error during closing the buffer: error %v", err)
+ }
+ }()
tr := make(chan interface{})
buf.Listen(func(event int, ctx interface{}) {
@@ -44,9 +64,20 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) {
close(tr)
})
- buf.Write([]byte("hel"))
- buf.Write([]byte("lo\n"))
- buf.Write([]byte("ending"))
+ _, err := buf.Write([]byte("hel"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
+
+ _, err = buf.Write([]byte("lo\n"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
+
+ _, err = buf.Write([]byte("ending"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
<-tr
assert.Equal(t, 0, buf.Len())
@@ -55,11 +86,27 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) {
func TestErrBuffer_Write_Event_Separated_NoListener(t *testing.T) {
buf := newErrBuffer()
- defer buf.Close()
-
- buf.Write([]byte("hel"))
- buf.Write([]byte("lo\n"))
- buf.Write([]byte("ending"))
+ defer func() {
+ err := buf.Close()
+ if err != nil {
+ t.Errorf("error during closing the buffer: error %v", err)
+ }
+ }()
+
+ _, err := buf.Write([]byte("hel"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
+
+ _, err = buf.Write([]byte("lo\n"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
+
+ _, err = buf.Write([]byte("ending"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
assert.Equal(t, 12, buf.Len())
assert.Equal(t, "hello\nending", buf.String())
@@ -67,9 +114,17 @@ func TestErrBuffer_Write_Event_Separated_NoListener(t *testing.T) {
func TestErrBuffer_Write_Remaining(t *testing.T) {
buf := newErrBuffer()
- defer buf.Close()
-
- buf.Write([]byte("hel"))
+ defer func() {
+ err := buf.Close()
+ if err != nil {
+ t.Errorf("error during closing the buffer: error %v", err)
+ }
+ }()
+
+ _, err := buf.Write([]byte("hel"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
assert.Equal(t, 3, buf.Len())
assert.Equal(t, "hel", buf.String())
diff --git a/pipe_factory.go b/pipe_factory.go
index d6fe0420..d8243d28 100644
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -1,6 +1,7 @@
package roadrunner
import (
+ "fmt"
"github.com/pkg/errors"
"github.com/spiral/goridge"
"io"
@@ -45,11 +46,20 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
}
if pid, err := fetchPID(w.rl); pid != *w.Pid {
- go func(w *Worker) { w.Kill() }(w)
+ go func(w *Worker) {
+ err := w.Kill()
+ if err != nil {
+ // there is no logger here, how to handle error in goroutines ?
+ fmt.Println(fmt.Sprintf("error killing the worker with PID number %d, Created: %s", w.Pid, w.Created))
+ }
+ }(w)
if wErr := w.Wait(); wErr != nil {
if _, ok := wErr.(*exec.ExitError); ok {
- err = errors.Wrap(wErr, err.Error())
+ // error might be nil here
+ if err != nil {
+ err = errors.Wrap(wErr, err.Error())
+ }
} else {
err = wErr
}
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
index 9d50e47f..27d1f74d 100644
--- a/pipe_factory_test.go
+++ b/pipe_factory_test.go
@@ -17,12 +17,15 @@ func Test_Pipe_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- w.Stop()
+ assert.NoError(t, w.Stop())
}
func Test_Pipe_StartError(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- cmd.Start()
+ err := cmd.Start()
+ if err != nil {
+ t.Errorf("error running the command: error %v", err)
+ }
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Error(t, err)
@@ -31,7 +34,10 @@ func Test_Pipe_StartError(t *testing.T) {
func Test_Pipe_PipeError(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- cmd.StdinPipe()
+ _, err := cmd.StdinPipe()
+ if err != nil {
+ t.Errorf("error creating the STDIN pipe: error %v", err)
+ }
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Error(t, err)
@@ -40,7 +46,10 @@ func Test_Pipe_PipeError(t *testing.T) {
func Test_Pipe_PipeError2(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- cmd.StdoutPipe()
+ _, err := cmd.StdinPipe()
+ if err != nil {
+ t.Errorf("error creating the STDIN pipe: error %v", err)
+ }
w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Error(t, err)
@@ -71,7 +80,12 @@ func Test_Pipe_Echo(t *testing.T) {
go func() {
assert.NoError(t, w.Wait())
}()
- defer w.Stop()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
res, err := w.Exec(&Payload{Body: []byte("hello")})
@@ -93,7 +107,10 @@ func Test_Pipe_Broken(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "undefined_function()")
}()
- defer w.Stop()
+ defer func() {
+ err := w.Stop()
+ assert.Error(t, err)
+ }()
res, err := w.Exec(&Payload{Body: []byte("hello")})
@@ -112,7 +129,10 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
}
}()
- w.Stop()
+ err := w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
}
}
@@ -121,9 +141,17 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
w, _ := NewPipeFactory().SpawnWorker(cmd)
go func() {
- w.Wait()
+ err := w.Wait()
+ if err != nil {
+ b.Errorf("error waiting the worker: error %v", err)
+ }
+ }()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
}()
- defer w.Stop()
for n := 0; n < b.N; n++ {
if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil {
diff --git a/protocol.go b/protocol.go
index 5523a3e5..42649264 100644
--- a/protocol.go
+++ b/protocol.go
@@ -34,6 +34,9 @@ func fetchPID(rl goridge.Relay) (pid int, err error) {
}
body, p, err := rl.Receive()
+ if err != nil {
+ return 0, err
+ }
if !p.HasFlag(goridge.PayloadControl) {
return 0, fmt.Errorf("unexpected response, header is missing")
}
diff --git a/server_config.go b/server_config.go
index 5fb4a014..641c1866 100644
--- a/server_config.go
+++ b/server_config.go
@@ -128,7 +128,10 @@ func (cfg *ServerConfig) makeFactory() (Factory, error) {
}
if dsn[0] == "unix" {
- syscall.Unlink(dsn[1])
+ err := syscall.Unlink(dsn[1])
+ if err != nil {
+ return nil, err
+ }
}
ln, err := net.Listen(dsn[0], dsn[1])
diff --git a/server_config_test.go b/server_config_test.go
index 753da599..4f26d6ab 100644
--- a/server_config_test.go
+++ b/server_config_test.go
@@ -18,7 +18,12 @@ func Test_ServerConfig_PipeFactory(t *testing.T) {
f, err = cfg.makeFactory()
assert.NoError(t, err)
assert.NotNil(t, f)
- defer f.Close()
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ t.Errorf("error closing factory or underlying connections: error %v", err)
+ }
+ }()
assert.NoError(t, err)
assert.IsType(t, &PipeFactory{}, f)
@@ -26,21 +31,32 @@ func Test_ServerConfig_PipeFactory(t *testing.T) {
func Test_ServerConfig_SocketFactory(t *testing.T) {
cfg := &ServerConfig{Relay: "tcp://:9111"}
- f, err := cfg.makeFactory()
+ f1, err := cfg.makeFactory()
assert.NoError(t, err)
- assert.NotNil(t, f)
- defer f.Close()
+ assert.NotNil(t, f1)
+ defer func() {
+ err := f1.Close()
+
+ if err != nil {
+ t.Errorf("error closing factory or underlying connections: error %v", err)
+ }
+ }()
assert.NoError(t, err)
- assert.IsType(t, &SocketFactory{}, f)
- assert.Equal(t, "tcp", f.(*SocketFactory).ls.Addr().Network())
- assert.Equal(t, "[::]:9111", f.(*SocketFactory).ls.Addr().String())
+ assert.IsType(t, &SocketFactory{}, f1)
+ assert.Equal(t, "tcp", f1.(*SocketFactory).ls.Addr().Network())
+ assert.Equal(t, "[::]:9111", f1.(*SocketFactory).ls.Addr().String())
cfg = &ServerConfig{Relay: "tcp://localhost:9112"}
- f, err = cfg.makeFactory()
+ f, err := cfg.makeFactory()
assert.NoError(t, err)
assert.NotNil(t, f)
- defer f.Close()
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ t.Errorf("error closing factory or underlying connections: error %v", err)
+ }
+ }()
assert.NoError(t, err)
assert.IsType(t, &SocketFactory{}, f)
@@ -55,7 +71,12 @@ func Test_ServerConfig_UnixSocketFactory(t *testing.T) {
cfg := &ServerConfig{Relay: "unix://unix.sock"}
f, err := cfg.makeFactory()
- defer f.Close()
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ t.Errorf("error closing factory or underlying connections: error %v", err)
+ }
+ }()
assert.NoError(t, err)
assert.IsType(t, &SocketFactory{}, f)
@@ -131,7 +152,10 @@ func Test_ServerConfigDefaults(t *testing.T) {
Command: "php tests/client.php pipes",
}
- cfg.InitDefaults()
+ err := cfg.InitDefaults()
+ if err != nil {
+ t.Errorf("error during the InitDefaults: error %v", err)
+ }
assert.Equal(t, "pipes", cfg.Relay)
assert.Equal(t, time.Minute, cfg.Pool.AllocateTimeout)
diff --git a/server_test.go b/server_test.go
index c973d634..9ab480b1 100644
--- a/server_test.go
+++ b/server_test.go
@@ -205,7 +205,10 @@ func TestServer_ReplacePool(t *testing.T) {
}
})
- rr.Reset()
+ err := rr.Reset()
+ if err != nil {
+ t.Errorf("error resetting the pool: error %v", err)
+ }
<-constructed
for _, w := range rr.Workers() {
@@ -239,9 +242,11 @@ func TestServer_ServerFailure(t *testing.T) {
rr.pool.(*StaticPool).cmd = func() *exec.Cmd {
return exec.Command("php", "tests/client.php", "echo", "broken-connection")
}
-
// killing random worker and expecting pool to replace it
- rr.Workers()[0].cmd.Process.Kill()
+ err := rr.Workers()[0].cmd.Process.Kill()
+ if err != nil {
+ t.Errorf("error killing the process: error %v", err)
+ }
<-failure
assert.True(t, true)
diff --git a/service/container.go b/service/container.go
index 0be4f853..742b4c3b 100644
--- a/service/container.go
+++ b/service/container.go
@@ -276,7 +276,10 @@ func (c *container) resolveValues(s interface{}, m reflect.Method, cfg Config) (
sc := reflect.New(v.Elem())
if dsc, ok := sc.Interface().(DefaultsConfig); ok {
- dsc.InitDefaults()
+ err := dsc.InitDefaults()
+ if err != nil {
+ return nil, err
+ }
if cfg == nil {
values = append(values, sc)
continue
diff --git a/service/container_test.go b/service/container_test.go
index 4628986a..ad4c1e64 100644
--- a/service/container_test.go
+++ b/service/container_test.go
@@ -67,7 +67,10 @@ type testCfg struct{ cfg string }
func (cfg *testCfg) Get(name string) Config {
vars := make(map[string]interface{})
- json.Unmarshal([]byte(cfg.cfg), &vars)
+ err := json.Unmarshal([]byte(cfg.cfg), &vars)
+ if err != nil {
+ panic("error unmarshalling the cfg.cfg value")
+ }
v, ok := vars[name]
if !ok {
@@ -439,6 +442,10 @@ func TestContainer_InitErrorB(t *testing.T) {
type testInitC struct{}
+func (r *testInitC) Test() bool {
+ return true
+}
+
func TestContainer_NoInit(t *testing.T) {
logger, _ := test.NewNullLogger()
logger.SetLevel(logrus.DebugLevel)
@@ -462,7 +469,6 @@ func (c *DCfg) Hydrate(cfg Config) error {
if err := cfg.Unmarshal(c); err != nil {
return err
}
-
if c.V == "fail" {
return errors.New("failed config")
}
diff --git a/service/env/config_test.go b/service/env/config_test.go
index 50fbdaa5..226712c3 100644
--- a/service/env/config_test.go
+++ b/service/env/config_test.go
@@ -30,6 +30,9 @@ func Test_Config_Hydrate_Empty(t *testing.T) {
func Test_Config_Defaults(t *testing.T) {
c := &Config{}
- c.InitDefaults()
+ err := c.InitDefaults()
+ if err != nil {
+ t.Errorf("Test_Config_Defaults failed: error %v", err)
+ }
assert.Len(t, c.Values, 0)
}
diff --git a/service/env/service_test.go b/service/env/service_test.go
index c20bb76c..19cc03c7 100644
--- a/service/env/service_test.go
+++ b/service/env/service_test.go
@@ -11,8 +11,12 @@ func Test_NewService(t *testing.T) {
}
func Test_Init(t *testing.T) {
+ var err error
s := &Service{}
- s.Init(&Config{})
+ _, err = s.Init(&Config{})
+ if err != nil {
+ t.Errorf("error during the s.Init: error %v", err)
+ }
assert.Len(t, s.values, 1)
values, err := s.GetEnv()
@@ -21,9 +25,13 @@ func Test_Init(t *testing.T) {
}
func Test_Extend(t *testing.T) {
+ var err error
s := NewService(map[string]string{"RR": "version"})
- s.Init(&Config{Values: map[string]string{"key": "value"}})
+ _, err = s.Init(&Config{Values: map[string]string{"key": "value"}})
+ if err != nil {
+ t.Errorf("error during the s.Init: error %v", err)
+ }
assert.Len(t, s.values, 2)
values, err := s.GetEnv()
@@ -34,9 +42,13 @@ func Test_Extend(t *testing.T) {
}
func Test_Set(t *testing.T) {
+ var err error
s := NewService(map[string]string{"RR": "version"})
- s.Init(&Config{Values: map[string]string{"key": "value"}})
+ _, err = s.Init(&Config{Values: map[string]string{"key": "value"}})
+ if err != nil {
+ t.Errorf("error during the s.Init: error %v", err)
+ }
assert.Len(t, s.values, 2)
s.SetEnv("key", "value-new")
diff --git a/service/headers/service_test.go b/service/headers/service_test.go
index 250f4458..2f29db5e 100644
--- a/service/headers/service_test.go
+++ b/service/headers/service_test.go
@@ -59,7 +59,12 @@ func Test_RequestHeaders(t *testing.T) {
}
}`}))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -68,7 +73,12 @@ func Test_RequestHeaders(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the body closing: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -103,7 +113,12 @@ func Test_ResponseHeaders(t *testing.T) {
}
}`}))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -112,7 +127,12 @@ func Test_ResponseHeaders(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the body closing: error %v", err)
+ }
+ }()
assert.Equal(t, "output-header", r.Header.Get("output"))
@@ -157,7 +177,12 @@ func TestCORS_OPTIONS(t *testing.T) {
}
}`}))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -166,7 +191,12 @@ func TestCORS_OPTIONS(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the body closing: error %v", err)
+ }
+ }()
assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials"))
assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Headers"))
@@ -215,7 +245,12 @@ func TestCORS_Pass(t *testing.T) {
}
}`}))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -224,7 +259,12 @@ func TestCORS_Pass(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the body closing: error %v", err)
+ }
+ }()
assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials"))
assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Headers"))
diff --git a/service/health/service.go b/service/health/service.go
index c0be68e0..c82f43b5 100644
--- a/service/health/service.go
+++ b/service/health/service.go
@@ -2,6 +2,8 @@ package health
import (
"context"
+ "fmt"
+ "github.com/sirupsen/logrus"
"net/http"
"sync"
@@ -14,19 +16,21 @@ const ID = "health"
// Service to serve an endpoint for checking the health of the worker pool
type Service struct {
cfg *Config
+ log *logrus.Logger
mu sync.Mutex
http *http.Server
httpService *rrhttp.Service
}
// Init health service
-func (s *Service) Init(cfg *Config, r *rrhttp.Service) (bool, error) {
+func (s *Service) Init(cfg *Config, r *rrhttp.Service, log *logrus.Logger) (bool, error) {
// Ensure the httpService is set
if r == nil {
return false, nil
}
s.cfg = cfg
+ s.log = log
s.httpService = r
return true, nil
}
@@ -37,7 +41,13 @@ func (s *Service) Serve() error {
s.mu.Lock()
s.http = &http.Server{Addr: s.cfg.Address, Handler: s}
s.mu.Unlock()
- return s.http.ListenAndServe()
+
+ err := s.http.ListenAndServe()
+ if err == nil || err == http.ErrServerClosed {
+ return nil
+ }
+
+ return err
}
// Stop the health endpoint
@@ -47,7 +57,12 @@ func (s *Service) Stop() {
if s.http != nil {
// gracefully stop the server
- go s.http.Shutdown(context.Background())
+ go func() {
+ err := s.http.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error(fmt.Errorf("error shutting down the metrics server: error %v", err))
+ }
+ }()
}
}
diff --git a/service/health/service_test.go b/service/health/service_test.go
index 76462df9..d346d92b 100644
--- a/service/health/service_test.go
+++ b/service/health/service_test.go
@@ -66,7 +66,12 @@ func TestService_Serve(t *testing.T) {
assert.NotNil(t, hS)
assert.Equal(t, service.StatusOK, httpStatus)
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 500)
defer c.Stop()
@@ -104,13 +109,21 @@ func TestService_Serve_DeadWorker(t *testing.T) {
assert.NotNil(t, hS)
assert.Equal(t, service.StatusOK, httpStatus)
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("server error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 500)
defer c.Stop()
// Kill the worker
httpSvc := hS.(*rrhttp.Service)
- httpSvc.Server().Workers()[0].Kill()
+ err := httpSvc.Server().Workers()[0].Kill()
+ if err != nil {
+ t.Errorf("error killing the worker: error %v", err)
+ }
// Check health check
_, res, err := get("http://localhost:2116/")
@@ -147,13 +160,21 @@ func TestService_Serve_DeadWorkerStillHealthy(t *testing.T) {
assert.NotNil(t, hS)
assert.Equal(t, service.StatusOK, httpStatus)
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Second * 1)
defer c.Stop()
// Kill one of the workers
httpSvc := hS.(*rrhttp.Service)
- httpSvc.Server().Workers()[0].Kill()
+ err := httpSvc.Server().Workers()[0].Kill()
+ if err != nil {
+ t.Errorf("error killing the worker: error %v", err)
+ }
// Check health check
_, res, err := get("http://localhost:2116/")
@@ -210,7 +231,12 @@ func TestService_Serve_NoServer(t *testing.T) {
assert.NotNil(t, hS)
assert.Equal(t, service.StatusOK, httpStatus)
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 500)
defer c.Stop()
@@ -253,7 +279,12 @@ func TestService_Serve_NoPool(t *testing.T) {
assert.NotNil(t, hS)
assert.Equal(t, service.StatusOK, httpStatus)
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 500)
defer c.Stop()
@@ -271,8 +302,15 @@ func get(url string) (string, *http.Response, error) {
if err != nil {
return "", nil, err
}
- defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return "", nil, err
+ }
+
+ err = r.Body.Close()
+ if err != nil {
+ return "", nil, err
+ }
return string(b), r, err
}
diff --git a/service/http/attributes/attributes_test.go b/service/http/attributes/attributes_test.go
index a71d6542..2360fd12 100644
--- a/service/http/attributes/attributes_test.go
+++ b/service/http/attributes/attributes_test.go
@@ -10,7 +10,10 @@ func TestAllAttributes(t *testing.T) {
r := &http.Request{}
r = Init(r)
- Set(r, "key", "value")
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
assert.Equal(t, All(r), map[string]interface{}{
"key": "value",
@@ -34,7 +37,10 @@ func TestGetAttribute(t *testing.T) {
r := &http.Request{}
r = Init(r)
- Set(r, "key", "value")
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
assert.Equal(t, Get(r, "key"), "value")
}
@@ -55,13 +61,19 @@ func TestSetAttribute(t *testing.T) {
r := &http.Request{}
r = Init(r)
- Set(r, "key", "value")
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
assert.Equal(t, Get(r, "key"), "value")
}
func TestSetAttributeNone(t *testing.T) {
r := &http.Request{}
- Set(r, "key", "value")
+ err := Set(r, "key", "value")
+ if err != nil {
+ t.Errorf("error during the Set: error %v", err)
+ }
assert.Equal(t, Get(r, "key"), nil)
}
diff --git a/service/http/config.go b/service/http/config.go
index ba3e6300..13a2cfc9 100644
--- a/service/http/config.go
+++ b/service/http/config.go
@@ -126,9 +126,18 @@ func (c *Config) Hydrate(cfg service.Config) error {
c.SSL.Port = 443
}
- c.HTTP2.InitDefaults()
- c.Uploads.InitDefaults()
- c.Workers.InitDefaults()
+ err := c.HTTP2.InitDefaults()
+ if err != nil {
+ return err
+ }
+ err = c.Uploads.InitDefaults()
+ if err != nil {
+ return err
+ }
+ err = c.Workers.InitDefaults()
+ if err != nil {
+ return err
+ }
if err := cfg.Unmarshal(c); err != nil {
return err
diff --git a/service/http/h2c_test.go b/service/http/h2c_test.go
index d806e5ff..e2669845 100644
--- a/service/http/h2c_test.go
+++ b/service/http/h2c_test.go
@@ -49,7 +49,12 @@ func Test_Service_H2C(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("fail to close the Body: error %v", err)
+ }
+ }()
assert.Equal(t, "101 Switching Protocols", r.Status)
diff --git a/service/http/handler.go b/service/http/handler.go
index a4da224d..3c667035 100644
--- a/service/http/handler.go
+++ b/service/http/handler.go
@@ -2,6 +2,7 @@ package http
import (
"github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
"github.com/spiral/roadrunner"
"net"
"net/http"
@@ -59,6 +60,7 @@ func (e *ResponseEvent) Elapsed() time.Duration {
// parsed files and query, payload will include parsed form dataTree (if any).
type Handler struct {
cfg *Config
+ log *logrus.Logger
rr *roadrunner.Server
mul sync.Mutex
lsn func(event int, ctx interface{})
@@ -98,8 +100,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// proxy IP resolution
h.resolveIP(req)
- req.Open()
- defer req.Close()
+ req.Open(h.log)
+ defer req.Close(h.log)
p, err := req.Payload()
if err != nil {
@@ -120,7 +122,10 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
h.handleResponse(req, resp, start)
- resp.Write(w)
+ err = resp.Write(w)
+ if err != nil {
+ h.handleError(w, r, err, start)
+ }
}
// handleError sends error.
@@ -128,7 +133,10 @@ func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error,
h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
w.WriteHeader(500)
- w.Write([]byte(err.Error()))
+ _, err = w.Write([]byte(err.Error()))
+ if err != nil {
+ h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)})
+ }
}
// handleResponse triggers response event.
diff --git a/service/http/handler_test.go b/service/http/handler_test.go
index e29b76ac..4efcdd00 100644
--- a/service/http/handler_test.go
+++ b/service/http/handler_test.go
@@ -23,9 +23,15 @@ func get(url string) (string, *http.Response, error) {
if err != nil {
return "", nil, err
}
- defer r.Body.Close()
-
b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return "", nil, err
+ }
+
+ err = r.Body.Close()
+ if err != nil {
+ return "", nil, err
+ }
return string(b), r, err
}
@@ -44,9 +50,16 @@ func getHeader(url string, h map[string]string) (string, *http.Response, error)
if err != nil {
return "", nil, err
}
- defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return "", nil, err
+ }
+
+ err = r.Body.Close()
+ if err != nil {
+ return "", nil, err
+ }
return string(b), r, err
}
@@ -74,9 +87,19 @@ func TestHandler_Echo(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
body, r, err := get("http://localhost:8177/?hello=world")
@@ -165,10 +188,20 @@ func TestHandler_Headers(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8078", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
- time.Sleep(time.Millisecond * 10)
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 100)
req, err := http.NewRequest("GET", "http://localhost:8078?hello=world", nil)
assert.NoError(t, err)
@@ -177,7 +210,13 @@ func TestHandler_Headers(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -212,9 +251,19 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8088", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil)
@@ -224,7 +273,13 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -258,9 +313,19 @@ func TestHandler_User_Agent(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8088", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil)
@@ -270,7 +335,13 @@ func TestHandler_User_Agent(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -304,9 +375,19 @@ func TestHandler_Cookies(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8079", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
req, err := http.NewRequest("GET", "http://localhost:8079", nil)
@@ -316,7 +397,13 @@ func TestHandler_Cookies(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -355,9 +442,19 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8090", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
req, err := http.NewRequest(
@@ -371,7 +468,13 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -405,9 +508,19 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8081", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`))
@@ -417,7 +530,12 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -451,9 +569,19 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8082", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`))
@@ -463,7 +591,13 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -497,9 +631,19 @@ func TestHandler_FormData_POST(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8083", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
form := url.Values{}
@@ -520,7 +664,13 @@ func TestHandler_FormData_POST(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -555,9 +705,19 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8083", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
form := url.Values{}
@@ -572,7 +732,13 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -607,9 +773,19 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8083", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
form := url.Values{}
@@ -630,7 +806,13 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -665,9 +847,19 @@ func TestHandler_FormData_PUT(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8084", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
form := url.Values{}
@@ -688,7 +880,13 @@ func TestHandler_FormData_PUT(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -723,9 +921,19 @@ func TestHandler_FormData_PATCH(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8085", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
form := url.Values{}
@@ -746,7 +954,13 @@ func TestHandler_FormData_PATCH(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -781,25 +995,72 @@ func TestHandler_Multipart_POST(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8019", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
var mb bytes.Buffer
w := multipart.NewWriter(&mb)
- w.WriteField("key", "value")
+ err := w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name1")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name2")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name3")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[x][y][z]", "y")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
- w.WriteField("key", "value")
- w.WriteField("name[]", "name1")
- w.WriteField("name[]", "name2")
- w.WriteField("name[]", "name3")
- w.WriteField("arr[x][y][z]", "y")
- w.WriteField("arr[x][y][e]", "f")
- w.WriteField("arr[c]p", "l")
- w.WriteField("arr[c]z", "")
+ err = w.WriteField("arr[x][y][e]", "f")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
- w.Close()
+ err = w.WriteField("arr[c]p", "l")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[c]z", "")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the writer: error %v", err)
+ }
req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
assert.NoError(t, err)
@@ -808,7 +1069,13 @@ func TestHandler_Multipart_POST(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -843,25 +1110,72 @@ func TestHandler_Multipart_PUT(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8020", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
var mb bytes.Buffer
w := multipart.NewWriter(&mb)
- w.WriteField("key", "value")
+ err := w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
- w.WriteField("key", "value")
- w.WriteField("name[]", "name1")
- w.WriteField("name[]", "name2")
- w.WriteField("name[]", "name3")
- w.WriteField("arr[x][y][z]", "y")
- w.WriteField("arr[x][y][e]", "f")
- w.WriteField("arr[c]p", "l")
- w.WriteField("arr[c]z", "")
+ err = w.WriteField("name[]", "name1")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
- w.Close()
+ err = w.WriteField("name[]", "name2")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name3")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[x][y][z]", "y")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[x][y][e]", "f")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[c]p", "l")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[c]z", "")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the writer: error %v", err)
+ }
req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, &mb)
assert.NoError(t, err)
@@ -870,7 +1184,13 @@ func TestHandler_Multipart_PUT(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -905,25 +1225,72 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8021", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
var mb bytes.Buffer
w := multipart.NewWriter(&mb)
- w.WriteField("key", "value")
+ err := w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("key", "value")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name1")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
- w.WriteField("key", "value")
- w.WriteField("name[]", "name1")
- w.WriteField("name[]", "name2")
- w.WriteField("name[]", "name3")
- w.WriteField("arr[x][y][z]", "y")
- w.WriteField("arr[x][y][e]", "f")
- w.WriteField("arr[c]p", "l")
- w.WriteField("arr[c]z", "")
+ err = w.WriteField("name[]", "name2")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("name[]", "name3")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
- w.Close()
+ err = w.WriteField("arr[x][y][z]", "y")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[x][y][e]", "f")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[c]p", "l")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.WriteField("arr[c]z", "")
+ if err != nil {
+ t.Errorf("error writing the field: error %v", err)
+ }
+
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the writer: error %v", err)
+ }
req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, &mb)
assert.NoError(t, err)
@@ -932,7 +1299,13 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -967,9 +1340,19 @@ func TestHandler_Error(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
_, r, err := get("http://localhost:8177/?hello=world")
@@ -1001,9 +1384,19 @@ func TestHandler_Error2(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
_, r, err := get("http://localhost:8177/?hello=world")
@@ -1035,9 +1428,19 @@ func TestHandler_Error3(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
b2 := &bytes.Buffer{}
@@ -1050,7 +1453,13 @@ func TestHandler_Error3(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the closing Body: error %v", err)
+
+ }
+ }()
assert.NoError(t, err)
assert.Equal(t, 500, r.StatusCode)
@@ -1080,9 +1489,19 @@ func TestHandler_ResponseDuration(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
gotresp := make(chan interface{})
@@ -1129,9 +1548,19 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
gotresp := make(chan interface{})
@@ -1178,9 +1607,19 @@ func TestHandler_ErrorDuration(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
goterr := make(chan interface{})
@@ -1231,15 +1670,28 @@ func TestHandler_IP(t *testing.T) {
}),
}
- h.cfg.parseCIDRs()
+ err := h.cfg.parseCIDRs()
+ if err != nil {
+ t.Errorf("error parsing CIDRs: error %v", err)
+ }
assert.NoError(t, h.rr.Start())
defer h.rr.Stop()
hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
body, r, err := get("http://127.0.0.1:8177/")
@@ -1277,15 +1729,28 @@ func TestHandler_XRealIP(t *testing.T) {
}),
}
- h.cfg.parseCIDRs()
+ err := h.cfg.parseCIDRs()
+ if err != nil {
+ t.Errorf("error parsing CIDRs: error %v", err)
+ }
assert.NoError(t, h.rr.Start())
defer h.rr.Stop()
hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{
@@ -1328,15 +1793,27 @@ func TestHandler_XForwardedFor(t *testing.T) {
}),
}
- h.cfg.parseCIDRs()
-
+ err := h.cfg.parseCIDRs()
+ if err != nil {
+ t.Errorf("error parsing CIDRs: error %v", err)
+ }
assert.NoError(t, h.rr.Start())
defer h.rr.Stop()
hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{
@@ -1379,15 +1856,27 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
}),
}
- h.cfg.parseCIDRs()
-
+ err := h.cfg.parseCIDRs()
+ if err != nil {
+ t.Errorf("error parsing CIDRs: error %v", err)
+ }
assert.NoError(t, h.rr.Start())
defer h.rr.Stop()
hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{
@@ -1419,13 +1908,26 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
}),
}
- h.rr.Start()
+ err := h.rr.Start()
+ if err != nil {
+ b.Errorf("error starting the worker pool: error %v", err)
+ }
defer h.rr.Stop()
hs := &http.Server{Addr: ":8177", Handler: h}
- defer hs.Shutdown(context.Background())
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ b.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
- go func() { hs.ListenAndServe() }()
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ b.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
bb := "WORLD"
@@ -1434,11 +1936,21 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
if err != nil {
b.Fail()
}
- defer r.Body.Close()
-
- br, _ := ioutil.ReadAll(r.Body)
- if string(br) != bb {
- b.Fail()
+ // Response might be nil here
+ if r != nil {
+ br, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ b.Errorf("error reading Body: error %v", err)
+ }
+ if string(br) != bb {
+ b.Fail()
+ }
+ err = r.Body.Close()
+ if err != nil {
+ b.Errorf("error closing the Body: error %v", err)
+ }
+ } else {
+ b.Errorf("got nil response")
}
}
}
diff --git a/service/http/request.go b/service/http/request.go
index 98508342..5d91bfb6 100644
--- a/service/http/request.go
+++ b/service/http/request.go
@@ -3,6 +3,7 @@ package http
import (
"encoding/json"
"fmt"
+ "github.com/sirupsen/logrus"
"github.com/spiral/roadrunner"
"github.com/spiral/roadrunner/service/http/attributes"
"io/ioutil"
@@ -112,21 +113,21 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) {
}
// Open moves all uploaded files to temporary directory so it can be given to php later.
-func (r *Request) Open() {
+func (r *Request) Open(log *logrus.Logger) {
if r.Uploads == nil {
return
}
- r.Uploads.Open()
+ r.Uploads.Open(log)
}
// Close clears all temp file uploads
-func (r *Request) Close() {
+func (r *Request) Close(log *logrus.Logger) {
if r.Uploads == nil {
return
}
- r.Uploads.Clear()
+ r.Uploads.Clear(log)
}
// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
diff --git a/service/http/response.go b/service/http/response.go
index 166ced82..aafaed13 100644
--- a/service/http/response.go
+++ b/service/http/response.go
@@ -36,7 +36,10 @@ func (r *Response) Write(w http.ResponseWriter) error {
p, h := handlePushHeaders(r.Headers)
if pusher, ok := w.(http.Pusher); ok {
for _, v := range p {
- pusher.Push(v, nil)
+ err := pusher.Push(v, nil)
+ if err != nil {
+ return err
+ }
}
}
@@ -50,7 +53,10 @@ func (r *Response) Write(w http.ResponseWriter) error {
w.WriteHeader(r.Status)
if data, ok := r.body.([]byte); ok {
- w.Write(data)
+ _, err := w.Write(data)
+ if err != nil {
+ return err
+ }
}
if rc, ok := r.body.(io.Reader); ok {
diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go
index 669b201c..0e4b2c0a 100644
--- a/service/http/rpc_test.go
+++ b/service/http/rpc_test.go
@@ -49,7 +49,12 @@ func Test_RPC(t *testing.T) {
s2, _ := c.Get(rpc.ID)
rs := s2.(*rpc.Service)
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -110,7 +115,12 @@ func Test_RPC_Unix(t *testing.T) {
s2, _ := c.Get(rpc.ID)
rs := s2.(*rpc.Service)
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -164,7 +174,12 @@ func Test_Workers(t *testing.T) {
s2, _ := c.Get(rpc.ID)
rs := s2.(*rpc.Service)
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
diff --git a/service/http/service.go b/service/http/service.go
index 945a12c4..abe7b3a7 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -3,6 +3,7 @@ package http
import (
"context"
"fmt"
+ "github.com/sirupsen/logrus"
"github.com/spiral/roadrunner"
"github.com/spiral/roadrunner/service/env"
"github.com/spiral/roadrunner/service/http/attributes"
@@ -31,6 +32,7 @@ type middleware func(f http.HandlerFunc) http.HandlerFunc
// Service manages rr, http servers.
type Service struct {
cfg *Config
+ log *logrus.Logger
cprod roadrunner.CommandProducer
env env.Environment
lsns []func(event int, ctx interface{})
@@ -66,8 +68,9 @@ func (s *Service) AddListener(l func(event int, ctx interface{})) {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment) (bool, error) {
+func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment, log *logrus.Logger) (bool, error) {
s.cfg = cfg
+ s.log = log
s.env = e
if r != nil {
@@ -139,19 +142,34 @@ func (s *Service) Serve() error {
if s.http != nil {
go func() {
- err <- s.http.ListenAndServe()
+ httpErr := s.http.ListenAndServe()
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ err <- httpErr
+ } else {
+ err <- nil
+ }
}()
}
if s.https != nil {
go func() {
- err <- s.https.ListenAndServeTLS(s.cfg.SSL.Cert, s.cfg.SSL.Key)
+ httpErr := s.https.ListenAndServeTLS(s.cfg.SSL.Cert, s.cfg.SSL.Key)
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ err <- httpErr
+ } else {
+ err <- nil
+ }
}()
}
if s.fcgi != nil {
go func() {
- err <- s.serveFCGI()
+ httpErr := s.serveFCGI()
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ err <- httpErr
+ } else {
+ err <- nil
+ }
}()
}
@@ -164,15 +182,35 @@ func (s *Service) Stop() {
defer s.mu.Unlock()
if s.fcgi != nil {
- go s.fcgi.Shutdown(context.Background())
+ go func() {
+ err := s.fcgi.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ // Stop() error
+ // push error from goroutines to the channel and block unil error or success shutdown or timeout
+ s.log.Error(fmt.Errorf("error shutting down the fcgi server, error: %v", err))
+ return
+ }
+ }()
}
if s.https != nil {
- go s.https.Shutdown(context.Background())
+ go func() {
+ err := s.https.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error(fmt.Errorf("error shutting down the https server, error: %v", err))
+ return
+ }
+ }()
}
if s.http != nil {
- go s.http.Shutdown(context.Background())
+ go func() {
+ err := s.http.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error(fmt.Errorf("error shutting down the http server, error: %v", err))
+ return
+ }
+ }()
}
}
diff --git a/service/http/service_test.go b/service/http/service_test.go
index 69cb7003..c4b2c2c4 100644
--- a/service/http/service_test.go
+++ b/service/http/service_test.go
@@ -53,7 +53,8 @@ func Test_Service_NoConfig(t *testing.T) {
c := service.NewContainer(logger)
c.Register(ID, &Service{})
- c.Init(&testCfg{httpCfg: `{"Enable":true}`})
+ err := c.Init(&testCfg{httpCfg: `{"Enable":true}`})
+ assert.Error(t, err)
s, st := c.Get(ID)
assert.NotNil(t, s)
@@ -138,7 +139,12 @@ func Test_Service_Echo(t *testing.T) {
// should do nothing
s.(*Service).Stop()
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -147,7 +153,12 @@ func Test_Service_Echo(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -191,7 +202,12 @@ func Test_Service_Env(t *testing.T) {
// should do nothing
s.(*Service).Stop()
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -200,7 +216,12 @@ func Test_Service_Env(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -249,7 +270,12 @@ func Test_Service_ErrorEcho(t *testing.T) {
}
})
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -258,7 +284,12 @@ func Test_Service_ErrorEcho(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -304,14 +335,22 @@ func Test_Service_Middleware(t *testing.T) {
return func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/halt" {
w.WriteHeader(500)
- w.Write([]byte("halted"))
+ _, err := w.Write([]byte("halted"))
+ if err != nil {
+ t.Errorf("error writing the data to the http reply: error %v", err)
+ }
} else {
f(w, r)
}
}
})
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -320,7 +359,6 @@ func Test_Service_Middleware(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -329,19 +367,27 @@ func Test_Service_Middleware(t *testing.T) {
assert.Equal(t, 201, r.StatusCode)
assert.Equal(t, "WORLD", string(b))
+ err = r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+
req, err = http.NewRequest("GET", "http://localhost:6029/halt", nil)
assert.NoError(t, err)
r, err = http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
-
b, err = ioutil.ReadAll(r.Body)
assert.NoError(t, err)
assert.NoError(t, err)
assert.Equal(t, 500, r.StatusCode)
assert.Equal(t, "halted", string(b))
+
+ err = r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
}
func Test_Service_Listener(t *testing.T) {
@@ -381,7 +427,12 @@ func Test_Service_Listener(t *testing.T) {
}
})
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
c.Stop()
diff --git a/service/http/ssl_test.go b/service/http/ssl_test.go
index 63eb90b1..c9b4d090 100644
--- a/service/http/ssl_test.go
+++ b/service/http/ssl_test.go
@@ -47,7 +47,12 @@ func Test_SSL_Service_Echo(t *testing.T) {
// should do nothing
s.(*Service).Stop()
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -56,7 +61,12 @@ func Test_SSL_Service_Echo(t *testing.T) {
r, err := sslClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("fail to close the Body: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -93,7 +103,12 @@ func Test_SSL_Service_NoRedirect(t *testing.T) {
// should do nothing
s.(*Service).Stop()
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -102,7 +117,12 @@ func Test_SSL_Service_NoRedirect(t *testing.T) {
r, err := sslClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("fail to close the Body: error %v", err)
+ }
+ }()
assert.Nil(t, r.TLS)
@@ -142,7 +162,12 @@ func Test_SSL_Service_Redirect(t *testing.T) {
// should do nothing
s.(*Service).Stop()
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -151,7 +176,12 @@ func Test_SSL_Service_Redirect(t *testing.T) {
r, err := sslClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("fail to close the Body: error %v", err)
+ }
+ }()
assert.NotNil(t, r.TLS)
@@ -191,7 +221,12 @@ func Test_SSL_Service_Push(t *testing.T) {
// should do nothing
s.(*Service).Stop()
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -200,7 +235,12 @@ func Test_SSL_Service_Push(t *testing.T) {
r, err := sslClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("fail to close the Body: error %v", err)
+ }
+ }()
assert.NotNil(t, r.TLS)
diff --git a/service/http/uploads.go b/service/http/uploads.go
index 7610ab28..8a46f230 100644
--- a/service/http/uploads.go
+++ b/service/http/uploads.go
@@ -2,6 +2,8 @@ package http
import (
"encoding/json"
+ "fmt"
+ "github.com/sirupsen/logrus"
"io"
"io/ioutil"
"mime/multipart"
@@ -45,13 +47,16 @@ func (u *Uploads) MarshalJSON() ([]byte, error) {
// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
// will be handled individually.
-func (u *Uploads) Open() {
+func (u *Uploads) Open(log *logrus.Logger) {
var wg sync.WaitGroup
for _, f := range u.list {
wg.Add(1)
go func(f *FileUpload) {
defer wg.Done()
- f.Open(u.cfg)
+ err := f.Open(u.cfg)
+ if err != nil && log != nil {
+ log.Error(fmt.Errorf("error opening the file: error %v", err))
+ }
}(f)
}
@@ -59,10 +64,13 @@ func (u *Uploads) Open() {
}
// Clear deletes all temporary files.
-func (u *Uploads) Clear() {
+func (u *Uploads) Clear(log *logrus.Logger) {
for _, f := range u.list {
if f.TempFilename != "" && exists(f.TempFilename) {
- os.Remove(f.TempFilename)
+ err := os.Remove(f.TempFilename)
+ if err != nil && log != nil {
+ log.Error(fmt.Errorf("error removing the file: error %v", err))
+ }
}
}
}
@@ -99,7 +107,13 @@ func NewUpload(f *multipart.FileHeader) *FileUpload {
}
// Open moves file content into temporary file available for PHP.
-func (f *FileUpload) Open(cfg *UploadsConfig) error {
+// NOTE:
+// There is 2 deferred functions, and in case of getting 2 errors from both functions
+// error from close of temp file would be overwritten by error from the main file
+// STACK
+// DEFER FILE CLOSE (2)
+// DEFER TMP CLOSE (1)
+func (f *FileUpload) Open(cfg *UploadsConfig) (err error) {
if cfg.Forbids(f.Name) {
f.Error = UploadErrorExtension
return nil
@@ -110,7 +124,11 @@ func (f *FileUpload) Open(cfg *UploadsConfig) error {
f.Error = UploadErrorNoFile
return err
}
- defer file.Close()
+
+ defer func() {
+ // close the main file
+ err = file.Close()
+ }()
tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload")
if err != nil {
@@ -120,7 +138,10 @@ func (f *FileUpload) Open(cfg *UploadsConfig) error {
}
f.TempFilename = tmp.Name()
- defer tmp.Close()
+ defer func() {
+ // close the temp file
+ err = tmp.Close()
+ }()
if f.Size, err = io.Copy(tmp, file); err != nil {
f.Error = UploadErrorCantWrite
@@ -131,10 +152,8 @@ func (f *FileUpload) Open(cfg *UploadsConfig) error {
// exists if file exists.
func exists(path string) bool {
- _, err := os.Stat(path)
- if err == nil {
- return true
+ if _, err := os.Stat(path); os.IsNotExist(err) {
+ return false
}
-
- return false
+ return true
}
diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go
index 0fbf0e14..1890c02b 100644
--- a/service/http/uploads_test.go
+++ b/service/http/uploads_test.go
@@ -6,6 +6,7 @@ import (
"crypto/md5"
"encoding/hex"
"encoding/json"
+ "fmt"
"github.com/spiral/roadrunner"
"github.com/stretchr/testify/assert"
"io"
@@ -41,22 +42,43 @@ func TestHandler_Upload_File(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8021", Handler: h}
- defer hs.Shutdown(context.Background())
-
- go func() { hs.ListenAndServe() }()
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
var mb bytes.Buffer
w := multipart.NewWriter(&mb)
f := mustOpen("uploads_test.go")
- defer f.Close()
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ t.Errorf("failed to close a file: error %v", err)
+ }
+ }()
fw, err := w.CreateFormFile("upload", f.Name())
assert.NotNil(t, fw)
assert.NoError(t, err)
- io.Copy(fw, f)
+ _, err = io.Copy(fw, f)
+ if err != nil {
+ t.Errorf("error copying the file: error %v", err)
+ }
- w.Close()
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the file: error %v", err)
+ }
req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
assert.NoError(t, err)
@@ -65,7 +87,12 @@ func TestHandler_Upload_File(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -102,22 +129,43 @@ func TestHandler_Upload_NestedFile(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8021", Handler: h}
- defer hs.Shutdown(context.Background())
-
- go func() { hs.ListenAndServe() }()
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
var mb bytes.Buffer
w := multipart.NewWriter(&mb)
f := mustOpen("uploads_test.go")
- defer f.Close()
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ t.Errorf("failed to close a file: error %v", err)
+ }
+ }()
fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name())
assert.NotNil(t, fw)
assert.NoError(t, err)
- io.Copy(fw, f)
+ _, err = io.Copy(fw, f)
+ if err != nil {
+ t.Errorf("error copying the file: error %v", err)
+ }
- w.Close()
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the file: error %v", err)
+ }
req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
assert.NoError(t, err)
@@ -126,7 +174,12 @@ func TestHandler_Upload_NestedFile(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -163,22 +216,43 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8021", Handler: h}
- defer hs.Shutdown(context.Background())
-
- go func() { hs.ListenAndServe() }()
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
var mb bytes.Buffer
w := multipart.NewWriter(&mb)
f := mustOpen("uploads_test.go")
- defer f.Close()
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ t.Errorf("failed to close a file: error %v", err)
+ }
+ }()
fw, err := w.CreateFormFile("upload", f.Name())
assert.NotNil(t, fw)
assert.NoError(t, err)
- io.Copy(fw, f)
+ _, err = io.Copy(fw, f)
+ if err != nil {
+ t.Errorf("error copying the file: error %v", err)
+ }
- w.Close()
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the file: error %v", err)
+ }
req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
assert.NoError(t, err)
@@ -187,7 +261,12 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -224,22 +303,43 @@ func TestHandler_Upload_File_Forbids(t *testing.T) {
defer h.rr.Stop()
hs := &http.Server{Addr: ":8021", Handler: h}
- defer hs.Shutdown(context.Background())
-
- go func() { hs.ListenAndServe() }()
+ defer func() {
+ err := hs.Shutdown(context.Background())
+ if err != nil {
+ t.Errorf("error during the shutdown: error %v", err)
+ }
+ }()
+
+ go func() {
+ err := hs.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ t.Errorf("error listening the interface: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 10)
var mb bytes.Buffer
w := multipart.NewWriter(&mb)
f := mustOpen("uploads_test.go")
- defer f.Close()
+ defer func() {
+ err := f.Close()
+ if err != nil {
+ t.Errorf("failed to close a file: error %v", err)
+ }
+ }()
fw, err := w.CreateFormFile("upload", f.Name())
assert.NotNil(t, fw)
assert.NoError(t, err)
- io.Copy(fw, f)
+ _, err = io.Copy(fw, f)
+ if err != nil {
+ t.Errorf("error copying the file: error %v", err)
+ }
- w.Close()
+ err = w.Close()
+ if err != nil {
+ t.Errorf("error closing the file: error %v", err)
+ }
req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb)
assert.NoError(t, err)
@@ -248,7 +348,12 @@ func TestHandler_Upload_File_Forbids(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error closing the Body: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -282,28 +387,47 @@ type fInfo struct {
MD5 string `json:"md5,omitempty"`
}
-func fileString(f string, err int, mime string) string {
- s, _ := os.Stat(f)
+func fileString(f string, errNo int, mime string) string {
+ s, err := os.Stat(f)
+ if err != nil {
+ fmt.Println(fmt.Errorf("error stat the file, error: %v", err))
+ }
+
+ ff, err := os.Open(f)
+ if err != nil {
+ fmt.Println(fmt.Errorf("error opening the file, error: %v", err))
+ }
+
+ defer func() {
+ er := ff.Close()
+ if er != nil {
+ fmt.Println(fmt.Errorf("error closing the file, error: %v", er))
+ }
+ }()
- ff, _ := os.Open(f)
- defer ff.Close()
h := md5.New()
- io.Copy(h, ff)
+ _, err = io.Copy(h, ff)
+ if err != nil {
+ fmt.Println(fmt.Errorf("error copying the file, error: %v", err))
+ }
v := &fInfo{
Name: s.Name(),
Size: s.Size(),
- Error: err,
+ Error: errNo,
Mime: mime,
MD5: hex.EncodeToString(h.Sum(nil)),
}
- if err != 0 {
+ if errNo != 0 {
v.MD5 = ""
v.Size = 0
}
- r, _ := json.Marshal(v)
+ r, err := json.Marshal(v)
+ if err != nil {
+ fmt.Println(fmt.Errorf("error marshalling fInfo, error: %v", err))
+ }
return string(r)
}
diff --git a/service/limit/config_test.go b/service/limit/config_test.go
index b8a6c0aa..b388791f 100644
--- a/service/limit/config_test.go
+++ b/service/limit/config_test.go
@@ -31,7 +31,10 @@ func Test_Controller_Default(t *testing.T) {
}
`}
c := &Config{}
- c.InitDefaults()
+ err := c.InitDefaults()
+ if err != nil {
+ t.Errorf("failed to InitDefaults: error %v", err)
+ }
assert.NoError(t, c.Hydrate(cfg))
assert.Equal(t, time.Second, c.Interval)
diff --git a/service/limit/controller.go b/service/limit/controller.go
index c11f4b91..24a158f7 100644
--- a/service/limit/controller.go
+++ b/service/limit/controller.go
@@ -66,7 +66,12 @@ func (c *controller) control(p roadrunner.Pool) {
// make sure worker still on initial request
if p.Remove(w, err) && w.State().NumExecs() == eID {
- go w.Kill()
+ go func() {
+ err := w.Kill()
+ if err != nil {
+ fmt.Printf("error killing worker with PID number: %d, created: %s", w.Pid, w.Created)
+ }
+ }()
c.report(EventExecTTL, w, err)
}
}
diff --git a/service/limit/service_test.go b/service/limit/service_test.go
index ade4abcc..8cb3d7dc 100644
--- a/service/limit/service_test.go
+++ b/service/limit/service_test.go
@@ -75,7 +75,12 @@ func Test_Service_PidEcho(t *testing.T) {
s, _ := c.Get(rrhttp.ID)
assert.NotNil(t, s)
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -84,7 +89,12 @@ func Test_Service_PidEcho(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the body closing: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -129,7 +139,12 @@ func Test_Service_ListenerPlusTTL(t *testing.T) {
}
})
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -140,7 +155,12 @@ func Test_Service_ListenerPlusTTL(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the body closing: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -194,7 +214,12 @@ func Test_Service_ListenerPlusIdleTTL(t *testing.T) {
}
})
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -205,7 +230,12 @@ func Test_Service_ListenerPlusIdleTTL(t *testing.T) {
r, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
- defer r.Body.Close()
+ defer func() {
+ err := r.Body.Close()
+ if err != nil {
+ t.Errorf("error during the body closing: error %v", err)
+ }
+ }()
b, err := ioutil.ReadAll(r.Body)
assert.NoError(t, err)
@@ -261,7 +291,12 @@ func Test_Service_Listener_MaxExecTTL(t *testing.T) {
}
})
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -313,7 +348,12 @@ func Test_Service_Listener_MaxMemoryUsage(t *testing.T) {
}
})
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -325,11 +365,17 @@ func Test_Service_Listener_MaxMemoryUsage(t *testing.T) {
for {
select {
case <-captured:
- http.DefaultClient.Do(req)
+ _, err := http.DefaultClient.Do(req)
+ if err != nil {
+ t.Errorf("error during sending the http request: error %v", err)
+ }
assert.NotEqual(t, lastPID, getPID(s))
return
default:
- http.DefaultClient.Do(req)
+ _, err := http.DefaultClient.Do(req)
+ if err != nil {
+ t.Errorf("error during sending the http request: error %v", err)
+ }
}
}
}
diff --git a/service/metrics/rpc.go b/service/metrics/rpc.go
index ee8ef984..9e7de640 100644
--- a/service/metrics/rpc.go
+++ b/service/metrics/rpc.go
@@ -32,26 +32,26 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) {
return fmt.Errorf("undefined collector `%s`", m.Name)
}
- switch c.(type) {
+ switch c := c.(type) {
case prometheus.Gauge:
- c.(prometheus.Gauge).Add(m.Value)
+ c.Add(m.Value)
case *prometheus.GaugeVec:
if len(m.Labels) == 0 {
return fmt.Errorf("required labels for collector `%s`", m.Name)
}
- c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Add(m.Value)
+ c.WithLabelValues(m.Labels...).Add(m.Value)
case prometheus.Counter:
- c.(prometheus.Counter).Add(m.Value)
+ c.Add(m.Value)
case *prometheus.CounterVec:
if len(m.Labels) == 0 {
return fmt.Errorf("required labels for collector `%s`", m.Name)
}
- c.(*prometheus.CounterVec).WithLabelValues(m.Labels...).Add(m.Value)
+ c.WithLabelValues(m.Labels...).Add(m.Value)
default:
return fmt.Errorf("collector `%s` does not support method `Add`", m.Name)
@@ -74,16 +74,16 @@ func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) {
return fmt.Errorf("undefined collector `%s`", m.Name)
}
- switch c.(type) {
+ switch c := c.(type) {
case prometheus.Gauge:
- c.(prometheus.Gauge).Sub(m.Value)
+ c.Sub(m.Value)
case *prometheus.GaugeVec:
if len(m.Labels) == 0 {
return fmt.Errorf("required labels for collector `%s`", m.Name)
}
- c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Sub(m.Value)
+ c.WithLabelValues(m.Labels...).Sub(m.Value)
default:
return fmt.Errorf("collector `%s` does not support method `Sub`", m.Name)
}
@@ -105,23 +105,23 @@ func (rpc *rpcServer) Observe(m *Metric, ok *bool) (err error) {
return fmt.Errorf("undefined collector `%s`", m.Name)
}
- switch c.(type) {
+ switch c := c.(type) {
case *prometheus.SummaryVec:
if len(m.Labels) == 0 {
return fmt.Errorf("required labels for collector `%s`", m.Name)
}
- c.(*prometheus.SummaryVec).WithLabelValues(m.Labels...).Observe(m.Value)
+ c.WithLabelValues(m.Labels...).Observe(m.Value)
case prometheus.Histogram:
- c.(prometheus.Histogram).Observe(m.Value)
+ c.Observe(m.Value)
case *prometheus.HistogramVec:
if len(m.Labels) == 0 {
return fmt.Errorf("required labels for collector `%s`", m.Name)
}
- c.(*prometheus.HistogramVec).WithLabelValues(m.Labels...).Observe(m.Value)
+ c.WithLabelValues(m.Labels...).Observe(m.Value)
default:
return fmt.Errorf("collector `%s` does not support method `Observe`", m.Name)
}
@@ -143,16 +143,16 @@ func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) {
return fmt.Errorf("undefined collector `%s`", m.Name)
}
- switch c.(type) {
+ switch c := c.(type) {
case prometheus.Gauge:
- c.(prometheus.Gauge).Set(m.Value)
+ c.Set(m.Value)
case *prometheus.GaugeVec:
if len(m.Labels) == 0 {
return fmt.Errorf("required labels for collector `%s`", m.Name)
}
- c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Set(m.Value)
+ c.WithLabelValues(m.Labels...).Set(m.Value)
default:
return fmt.Errorf("collector `%s` does not support method `Set`", m.Name)
diff --git a/service/metrics/rpc_test.go b/service/metrics/rpc_test.go
index 136f031c..2468c083 100644
--- a/service/metrics/rpc_test.go
+++ b/service/metrics/rpc_test.go
@@ -42,7 +42,12 @@ func setup(t *testing.T, metric string, portNum string) (*rpc2.Client, service.C
assert.True(t, s.(*Service).Enabled())
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 200)
client, err := rs.Client()
diff --git a/service/metrics/service.go b/service/metrics/service.go
index b581f041..6fa4da50 100644
--- a/service/metrics/service.go
+++ b/service/metrics/service.go
@@ -4,8 +4,10 @@ package metrics
import (
"context"
+ "fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/sirupsen/logrus"
"github.com/spiral/roadrunner/service/rpc"
"net/http"
"sync"
@@ -17,6 +19,7 @@ const ID = "metrics"
// Service to manage application metrics using Prometheus.
type Service struct {
cfg *Config
+ log *logrus.Logger
mu sync.Mutex
http *http.Server
collectors sync.Map
@@ -24,8 +27,9 @@ type Service struct {
}
// Init service.
-func (s *Service) Init(cfg *Config, r *rpc.Service) (bool, error) {
+func (s *Service) Init(cfg *Config, r *rpc.Service, log *logrus.Logger) (bool, error) {
s.cfg = cfg
+ s.log = log
s.registry = prometheus.NewRegistry()
s.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
@@ -78,7 +82,12 @@ func (s *Service) Serve() error {
)}
s.mu.Unlock()
- return s.http.ListenAndServe()
+ err = s.http.ListenAndServe()
+ if err == nil || err == http.ErrServerClosed {
+ return nil
+ }
+
+ return err
}
// Stop prometheus metrics service.
@@ -88,7 +97,13 @@ func (s *Service) Stop() {
if s.http != nil {
// gracefully stop server
- go s.http.Shutdown(context.Background())
+ go func() {
+ err := s.http.Shutdown(context.Background())
+ if err != nil {
+ // Function should be Stop() error
+ s.log.Error(fmt.Errorf("error shutting down the metrics server: error %v", err))
+ }
+ }()
}
}
diff --git a/service/metrics/service_test.go b/service/metrics/service_test.go
index 0cf6fd95..62e6f6d7 100644
--- a/service/metrics/service_test.go
+++ b/service/metrics/service_test.go
@@ -43,9 +43,16 @@ func get(url string) (string, *http.Response, error) {
if err != nil {
return "", nil, err
}
- defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return "", nil, err
+ }
+
+ err = r.Body.Close()
+ if err != nil {
+ return "", nil, err
+ }
return string(b), r, err
}
@@ -63,7 +70,12 @@ func TestService_Serve(t *testing.T) {
s, _ := c.Get(ID)
assert.NotNil(t, s)
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -94,7 +106,12 @@ func Test_ServiceCustomMetric(t *testing.T) {
assert.NoError(t, s.(*Service).Register(collector))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -127,7 +144,12 @@ func Test_ServiceCustomMetricMust(t *testing.T) {
s.(*Service).MustRegister(collector)
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -160,7 +182,12 @@ func Test_ConfiguredMetric(t *testing.T) {
assert.True(t, s.(*Service).Enabled())
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("error during the Serve: error %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go
index af261698..6791128b 100644
--- a/service/rpc/config_test.go
+++ b/service/rpc/config_test.go
@@ -40,7 +40,12 @@ func TestConfig_Listener(t *testing.T) {
ln, err := cfg.Listener()
assert.NoError(t, err)
assert.NotNil(t, ln)
- defer ln.Close()
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
assert.Equal(t, "tcp", ln.Addr().Network())
assert.Equal(t, "[::]:18001", ln.Addr().String())
@@ -56,7 +61,12 @@ func TestConfig_ListenerUnix(t *testing.T) {
ln, err := cfg.Listener()
assert.NoError(t, err)
assert.NotNil(t, ln)
- defer ln.Close()
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
assert.Equal(t, "unix", ln.Addr().Network())
assert.Equal(t, "file.sock", ln.Addr().String())
@@ -86,12 +96,22 @@ func TestConfig_Dialer(t *testing.T) {
cfg := &Config{Listen: "tcp://:18001"}
ln, _ := cfg.Listener()
- defer ln.Close()
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
conn, err := cfg.Dialer()
assert.NoError(t, err)
assert.NotNil(t, conn)
- defer conn.Close()
+ defer func() {
+ err := conn.Close()
+ if err != nil {
+ t.Errorf("error closing the connection: error %v", err)
+ }
+ }()
assert.Equal(t, "tcp", conn.RemoteAddr().Network())
assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String())
@@ -105,12 +125,22 @@ func TestConfig_DialerUnix(t *testing.T) {
cfg := &Config{Listen: "unix://file.sock"}
ln, _ := cfg.Listener()
- defer ln.Close()
+ defer func() {
+ err := ln.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
conn, err := cfg.Dialer()
assert.NoError(t, err)
assert.NotNil(t, conn)
- defer conn.Close()
+ defer func() {
+ err := conn.Close()
+ if err != nil {
+ t.Errorf("error closing the connection: error %v", err)
+ }
+ }()
assert.Equal(t, "unix", conn.RemoteAddr().Network())
assert.Equal(t, "file.sock", conn.RemoteAddr().String())
@@ -138,7 +168,10 @@ func Test_Config_DialerErrorMethod(t *testing.T) {
func Test_Config_Defaults(t *testing.T) {
c := &Config{}
- c.InitDefaults()
+ err := c.InitDefaults()
+ if err != nil {
+ t.Errorf("error during the InitDefaults: error %v", err)
+ }
assert.Equal(t, true, c.Enable)
assert.Equal(t, "tcp://127.0.0.1:6001", c.Listen)
}
diff --git a/service/static/service_test.go b/service/static/service_test.go
index d345b138..309804cc 100644
--- a/service/static/service_test.go
+++ b/service/static/service_test.go
@@ -41,9 +41,17 @@ func get(url string) (string, *http.Response, error) {
if err != nil {
return "", nil, err
}
- defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return "", nil, err
+ }
+
+ err = r.Body.Close()
+ if err != nil {
+ return "", nil, err
+ }
+
return string(b), r, err
}
@@ -76,7 +84,12 @@ func Test_Files(t *testing.T) {
}
}`}))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -129,7 +142,12 @@ func Test_Files_Disable(t *testing.T) {
}
}`}))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -226,7 +244,12 @@ func Test_Files_Forbid(t *testing.T) {
}
}`}))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -263,7 +286,12 @@ func Test_Files_Always(t *testing.T) {
}
}`}))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -300,7 +328,12 @@ func Test_Files_NotFound(t *testing.T) {
}
}`}))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -337,7 +370,12 @@ func Test_Files_Dir(t *testing.T) {
}
}`}))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -374,7 +412,12 @@ func Test_Files_NotForbid(t *testing.T) {
}
}`}))
- go func() { c.Serve() }()
+ go func() {
+ err := c.Serve()
+ if err != nil {
+ t.Errorf("serve error: %v", err)
+ }
+ }()
time.Sleep(time.Millisecond * 100)
defer c.Stop()
@@ -391,10 +434,17 @@ func tmpDir() string {
func all(fn string) string {
f, _ := os.Open(fn)
- defer f.Close()
b := &bytes.Buffer{}
- io.Copy(b, f)
+ _, err := io.Copy(b, f)
+ if err != nil {
+ return ""
+ }
+
+ err = f.Close()
+ if err != nil {
+ return ""
+ }
return b.String()
}
diff --git a/socket_factory.go b/socket_factory.go
index 43059e8a..84515f64 100644
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -51,7 +51,12 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
rl, err := f.findRelay(w, f.tout)
if err != nil {
- go func(w *Worker) { w.Kill() }(w)
+ go func(w *Worker) {
+ err := w.Kill()
+ if err != nil {
+ fmt.Println(fmt.Errorf("error killing the worker %v", err))
+ }
+ }(w)
if wErr := w.Wait(); wErr != nil {
if _, ok := wErr.(*exec.ExitError); ok {
diff --git a/socket_factory_test.go b/socket_factory_test.go
index 56d9313e..9f74cf8c 100644
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -14,7 +14,12 @@ func Test_Tcp_Start(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -29,7 +34,10 @@ func Test_Tcp_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- w.Stop()
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
}
func Test_Tcp_StartCloseFactory(t *testing.T) {
@@ -44,7 +52,12 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
f := NewSocketFactory(ls, time.Minute)
- defer f.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
w, err := f.SpawnWorker(cmd)
assert.NoError(t, err)
@@ -54,7 +67,10 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- w.Stop()
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
}
func Test_Tcp_StartError(t *testing.T) {
@@ -62,13 +78,21 @@ func Test_Tcp_StartError(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- cmd.Start()
+ err = cmd.Start()
+ if err != nil {
+ t.Errorf("error executing the command: error %v", err)
+ }
w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
assert.Error(t, err)
@@ -80,7 +104,12 @@ func Test_Tcp_Failboot(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -98,7 +127,12 @@ func Test_Tcp_Timeout(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -116,7 +150,12 @@ func Test_Tcp_Invalid(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -133,7 +172,12 @@ func Test_Tcp_Broken(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -147,7 +191,11 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "undefined_function()")
}()
- defer w.Stop()
+
+ defer func() {
+ err = w.Stop()
+ assert.Error(t, err)
+ }()
res, err := w.Exec(&Payload{Body: []byte("hello")})
@@ -160,7 +208,12 @@ func Test_Tcp_Echo(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -171,7 +224,12 @@ func Test_Tcp_Echo(t *testing.T) {
go func() {
assert.NoError(t, w.Wait())
}()
- defer w.Stop()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
res, err := w.Exec(&Payload{Body: []byte("hello")})
@@ -190,7 +248,12 @@ func Test_Unix_Start(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -205,7 +268,10 @@ func Test_Unix_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- w.Stop()
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
}
func Test_Unix_Failboot(t *testing.T) {
@@ -215,7 +281,12 @@ func Test_Unix_Failboot(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -235,7 +306,12 @@ func Test_Unix_Timeout(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -255,7 +331,12 @@ func Test_Unix_Invalid(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -274,7 +355,12 @@ func Test_Unix_Broken(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -288,7 +374,12 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "undefined_function()")
}()
- defer w.Stop()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
res, err := w.Exec(&Payload{Body: []byte("hello")})
@@ -303,7 +394,12 @@ func Test_Unix_Echo(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
t.Skip("socket is busy")
}
@@ -314,7 +410,12 @@ func Test_Unix_Echo(t *testing.T) {
go func() {
assert.NoError(t, w.Wait())
}()
- defer w.Stop()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
res, err := w.Exec(&Payload{Body: []byte("hello")})
@@ -329,7 +430,12 @@ func Test_Unix_Echo(t *testing.T) {
func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
ls, err := net.Listen("tcp", "localhost:9007")
if err == nil {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ b.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
b.Skip("socket is busy")
}
@@ -345,14 +451,22 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
}
}()
- w.Stop()
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
}
}
func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
ls, err := net.Listen("tcp", "localhost:9007")
if err == nil {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ b.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
b.Skip("socket is busy")
}
@@ -361,9 +475,17 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
go func() {
- w.Wait()
+ err := w.Wait()
+ if err != nil {
+ b.Errorf("error waiting: %v", err)
+ }
+ }()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
}()
- defer w.Stop()
for n := 0; n < b.N; n++ {
if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil {
@@ -379,7 +501,12 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ b.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
b.Skip("socket is busy")
}
@@ -395,7 +522,10 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
}
}()
- w.Stop()
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
}
}
@@ -406,7 +536,12 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
- defer ls.Close()
+ defer func() {
+ err := ls.Close()
+ if err != nil {
+ b.Errorf("error closing the listener: error %v", err)
+ }
+ }()
} else {
b.Skip("socket is busy")
}
@@ -415,9 +550,17 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
go func() {
- w.Wait()
+ err := w.Wait()
+ if err != nil {
+ b.Errorf("error waiting: %v", err)
+ }
+ }()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ b.Errorf("error stopping the worker: error %v", err)
+ }
}()
- defer w.Stop()
for n := 0; n < b.N; n++ {
if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil {
diff --git a/static_pool.go b/static_pool.go
index 66b1366e..2186227b 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -42,7 +42,6 @@ type StaticPool struct {
workers []*Worker
// invalid declares set of workers to be removed from the pool.
- mur sync.Mutex
remove sync.Map
// pool is being destroyed
@@ -108,9 +107,7 @@ func (p *StaticPool) Workers() (workers []*Worker) {
p.muw.RLock()
defer p.muw.RUnlock()
- for _, w := range p.workers {
- workers = append(workers, w)
- }
+ workers = append(workers, p.workers...)
return workers
}
@@ -294,7 +291,12 @@ func (p *StaticPool) discardWorker(w *Worker, caused interface{}) {
// destroyWorker destroys workers and removes it from the pool.
func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) {
- go w.Stop()
+ go func() {
+ err := w.Stop()
+ if err != nil {
+ p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ }
+ }()
select {
case <-w.waitDone:
diff --git a/static_pool_test.go b/static_pool_test.go
index a7e71fdb..fd2827cc 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -6,6 +6,7 @@ import (
"os/exec"
"runtime"
"strconv"
+ "strings"
"sync"
"testing"
"time"
@@ -23,12 +24,13 @@ func Test_NewPool(t *testing.T) {
NewPipeFactory(),
cfg,
)
+ assert.NoError(t, err)
+
assert.Equal(t, cfg, p.Config())
defer p.Destroy()
assert.NotNil(t, p)
- assert.NoError(t, err)
}
func Test_StaticPool_Invalid(t *testing.T) {
@@ -62,10 +64,11 @@ func Test_StaticPool_Echo(t *testing.T) {
NewPipeFactory(),
cfg,
)
+ assert.NoError(t, err)
+
defer p.Destroy()
assert.NotNil(t, p)
- assert.NoError(t, err)
res, err := p.Exec(&Payload{Body: []byte("hello")})
@@ -83,10 +86,11 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
NewPipeFactory(),
cfg,
)
+ assert.NoError(t, err)
+
defer p.Destroy()
assert.NotNil(t, p)
- assert.NoError(t, err)
res, err := p.Exec(&Payload{Body: []byte("hello"), Context: nil})
@@ -104,10 +108,11 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
NewPipeFactory(),
cfg,
)
+ assert.NoError(t, err)
+
defer p.Destroy()
assert.NotNil(t, p)
- assert.NoError(t, err)
res, err := p.Exec(&Payload{Body: []byte("hello"), Context: []byte("world")})
@@ -125,10 +130,10 @@ func Test_StaticPool_JobError(t *testing.T) {
NewPipeFactory(),
cfg,
)
+ assert.NoError(t, err)
defer p.Destroy()
assert.NotNil(t, p)
- assert.NoError(t, err)
res, err := p.Exec(&Payload{Body: []byte("hello")})
@@ -145,14 +150,17 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
NewPipeFactory(),
cfg,
)
+ assert.NoError(t, err)
defer p.Destroy()
assert.NotNil(t, p)
- assert.NoError(t, err)
+ done := make(chan interface{})
p.Listen(func(e int, ctx interface{}) {
if err, ok := ctx.(error); ok {
- assert.Contains(t, err.Error(), "undefined_function()")
+ if strings.Contains(err.Error(), "undefined_function()") {
+ close(done)
+ }
}
})
@@ -160,6 +168,8 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
+
+ <-done
}
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
@@ -168,10 +178,10 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
NewPipeFactory(),
cfg,
)
+ assert.NoError(t, err)
defer p.Destroy()
assert.NotNil(t, p)
- assert.NoError(t, err)
res, err := p.Exec(&Payload{Body: []byte("hello")})
@@ -191,9 +201,11 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
})
// killing random worker and expecting pool to replace it
- p.muw.Lock()
- p.workers[0].cmd.Process.Kill()
- p.muw.Unlock()
+ err = p.Workers()[0].cmd.Process.Kill()
+ if err != nil {
+ t.Errorf("error killing the process: error %v", err)
+ }
+
<-destructed
for _, w := range p.Workers() {
@@ -244,10 +256,10 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
DestroyTimeout: time.Second,
},
)
+ assert.NoError(t, err)
defer p.Destroy()
assert.NotNil(t, p)
- assert.NoError(t, err)
var lastPID string
lastPID = strconv.Itoa(*p.Workers()[0].Pid)
@@ -279,10 +291,10 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
DestroyTimeout: time.Second,
},
)
+ assert.NoError(t, err)
defer p.Destroy()
assert.NotNil(t, p)
- assert.NoError(t, err)
var lastPID string
lastPID = strconv.Itoa(*p.Workers()[0].Pid)
@@ -338,7 +350,13 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NotNil(t, p)
assert.NoError(t, err)
- go p.Exec(&Payload{Body: []byte("100")})
+ go func() {
+ _, err := p.Exec(&Payload{Body: []byte("100")})
+ if err != nil {
+ t.Errorf("error executing payload: error %v", err)
+ }
+
+ }()
time.Sleep(time.Millisecond * 10)
p.Destroy()
@@ -357,10 +375,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
DestroyTimeout: time.Second,
},
)
+ assert.NoError(t, err)
defer p.Destroy()
assert.NotNil(t, p)
- assert.NoError(t, err)
for _, w := range p.workers {
w.state.value = StateErrored
diff --git a/util/network.go b/util/network.go
index 70e38fdb..3f533023 100644
--- a/util/network.go
+++ b/util/network.go
@@ -2,6 +2,7 @@ package util
import (
"errors"
+ "fmt"
"net"
"strings"
"syscall"
@@ -11,15 +12,18 @@ import (
func CreateListener(address string) (net.Listener, error) {
dsn := strings.Split(address, "://")
if len(dsn) != 2 {
- return nil, errors.New("Invalid DSN (tcp://:6001, unix://file.sock)")
+ return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
}
if dsn[0] != "unix" && dsn[0] != "tcp" {
- return nil, errors.New("Invalid Protocol (tcp://:6001, unix://file.sock)")
+ return nil, errors.New("invalid Protocol (tcp://:6001, unix://file.sock)")
}
if dsn[0] == "unix" {
- syscall.Unlink(dsn[1])
+ err := syscall.Unlink(dsn[1])
+ if err != nil {
+ return nil, fmt.Errorf("error during the unlink syscall: error %v", err)
+ }
}
return net.Listen(dsn[0], dsn[1])
diff --git a/worker.go b/worker.go
index a95a80b3..32f63554 100644
--- a/worker.go
+++ b/worker.go
@@ -105,7 +105,9 @@ func (w *Worker) Wait() error {
if runtime.GOOS != "windows" {
// windows handles processes and close pipes differently,
// we can ignore wait here as process.Wait() already being handled above
- w.cmd.Wait()
+ if err := w.cmd.Wait(); err != nil {
+ return err
+ }
}
if w.endState.Success() {
@@ -214,10 +216,16 @@ func (w *Worker) start() error {
defer w.mu.Unlock()
if w.rl != nil {
- w.rl.Close()
+ err := w.rl.Close()
+ if err != nil {
+ w.err.lsn(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ }
}
- w.err.Close()
+ err := w.err.Close()
+ if err != nil {
+ w.err.lsn(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ }
}
}()
diff --git a/worker_test.go b/worker_test.go
index c357b6e0..e8cbef90 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -19,7 +19,10 @@ func Test_GetState(t *testing.T) {
assert.NotNil(t, w)
assert.Equal(t, StateReady, w.State().Value())
- w.Stop()
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
}
func Test_Kill(t *testing.T) {
@@ -35,7 +38,12 @@ func Test_Kill(t *testing.T) {
assert.NotNil(t, w)
assert.Equal(t, StateReady, w.State().Value())
- w.Kill()
+ defer func() {
+ err := w.Kill()
+ if err != nil {
+ t.Errorf("error killing the worker: error %v", err)
+ }
+ }()
}
func Test_Echo(t *testing.T) {
@@ -45,7 +53,12 @@ func Test_Echo(t *testing.T) {
go func() {
assert.NoError(t, w.Wait())
}()
- defer w.Stop()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
res, err := w.Exec(&Payload{Body: []byte("hello")})
@@ -64,7 +77,12 @@ func Test_BadPayload(t *testing.T) {
go func() {
assert.NoError(t, w.Wait())
}()
- defer w.Stop()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
res, err := w.Exec(nil)
@@ -103,7 +121,12 @@ func Test_String(t *testing.T) {
go func() {
assert.NoError(t, w.Wait())
}()
- defer w.Stop()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
assert.Contains(t, w.String(), "php tests/client.php echo pipes")
assert.Contains(t, w.String(), "ready")
@@ -117,7 +140,12 @@ func Test_Echo_Slow(t *testing.T) {
go func() {
assert.NoError(t, w.Wait())
}()
- defer w.Stop()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
res, err := w.Exec(&Payload{Body: []byte("hello")})
@@ -138,7 +166,11 @@ func Test_Broken(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "undefined_function()")
}()
- defer w.Stop()
+
+ defer func() {
+ err := w.Stop()
+ assert.Error(t, err)
+ }()
res, err := w.Exec(&Payload{Body: []byte("hello")})
assert.Nil(t, res)
@@ -163,7 +195,13 @@ func Test_Error(t *testing.T) {
go func() {
assert.NoError(t, w.Wait())
}()
- defer w.Stop()
+
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
res, err := w.Exec(&Payload{Body: []byte("hello")})
assert.Nil(t, res)
@@ -180,14 +218,28 @@ func Test_NumExecs(t *testing.T) {
go func() {
assert.NoError(t, w.Wait())
}()
- defer w.Stop()
+ defer func() {
+ err := w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the worker: error %v", err)
+ }
+ }()
- w.Exec(&Payload{Body: []byte("hello")})
+ _, err := w.Exec(&Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
assert.Equal(t, int64(1), w.State().NumExecs())
- w.Exec(&Payload{Body: []byte("hello")})
+ _, err = w.Exec(&Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
assert.Equal(t, int64(2), w.State().NumExecs())
- w.Exec(&Payload{Body: []byte("hello")})
+ _, err = w.Exec(&Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
assert.Equal(t, int64(3), w.State().NumExecs())
}