diff options
author | Paramtamtam <[email protected]> | 2019-12-24 10:48:49 +0500 |
---|---|---|
committer | GitHub <[email protected]> | 2019-12-24 10:48:49 +0500 |
commit | fbd2022729ab7ffe4eee8ba8b7357ba179e4d010 (patch) | |
tree | fa6b66b18259282b2efe00a41c445a3fa40aa701 | |
parent | 3f7d8bc007a4efdd48bb16253c2686006f0f9cda (diff) | |
parent | 75a817edcff22e721dcc3a7fa5590b866f630403 (diff) |
Merge branch 'master' into master
49 files changed, 2133 insertions, 492 deletions
diff --git a/.travis.yml b/.travis.yml index e98795fb..f3955eb9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,17 +29,19 @@ script: - go test ./service/health -race -v -coverprofile=health.txt -covermode=atomic after_success: - - bash <(curl -s https://codecov.io/bash) -f lib.txt - - bash <(curl -s https://codecov.io/bash) -f util.txt - - bash <(curl -s https://codecov.io/bash) -f service.txt - - bash <(curl -s https://codecov.io/bash) -f env.txt - - bash <(curl -s https://codecov.io/bash) -f rpc.txt - - bash <(curl -s https://codecov.io/bash) -f http.txt - - bash <(curl -s https://codecov.io/bash) -f static.txt - - bash <(curl -s https://codecov.io/bash) -f limit.txt - - bash <(curl -s https://codecov.io/bash) -f headers.txt - - bash <(curl -s https://codecov.io/bash) -f metrics.txt - - bash <(curl -s https://codecov.io/bash) -f health.txt + - curl https://codecov.io/bash -o codecov-bash + - chmod +x codecov-bash + - ./codecov-bash -f lib.txt + - ./codecov-bash -f util.txt + - ./codecov-bash -f service.txt + - ./codecov-bash -f env.txt + - ./codecov-bash -f rpc.txt + - ./codecov-bash -f http.txt + - ./codecov-bash -f static.txt + - ./codecov-bash -f limit.txt + - ./codecov-bash -f headers.txt + - ./codecov-bash -f metrics.txt + - ./codecov-bash -f health.txt jobs: include: diff --git a/CHANGELOG.md b/CHANGELOG.md index c5c4911b..5c8d9804 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,16 @@ UNRELEASED ---------- - added `Dockerfile` for building RR binary file by [@tarampampam](https://github.com/tarampampam) (closes [issue #218](https://github.com/spiral/roadrunner/issues/218)) +v1.5.3 (23.12.2019) +------------------- +- metric and RPC ports are rotated in tests to avoid false positive +- massive test and source cleanup (more error handlers) by @ValeryPiashchynski +- "Server closed" error has been suppressed +- added the ability to specify any config value via JSON flag `-j` +- minor improvements in Travis pipeline +- bump the minimum TLS version to TLS 1.2 +- added `Strict-Transport-Security` header for TLS requests + v1.5.2 (05.12.2019) ------------------- - added support for symfony/console 5.0 by @coxa @@ -3,7 +3,7 @@ cd $(dirname "${BASH_SOURCE[0]}") OD="$(pwd)" # Pushes application version into the build information. -RR_VERSION=1.5.2 +RR_VERSION=1.5.3 # Hardcode some values to the core package LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}" diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go index d6929473..515e6419 100644 --- a/cmd/rr/cmd/root.go +++ b/cmd/rr/cmd/root.go @@ -33,6 +33,7 @@ import ( var ( cfgFile, workDir, logFormat string override []string + mergeJson string // Verbose enables verbosity mode (container specific). Verbose bool @@ -73,6 +74,7 @@ func init() { CLI.PersistentFlags().StringVarP(&logFormat, "logFormat", "l", "color", "select log formatter (color, json, plain)") CLI.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file (default is .rr.yaml)") CLI.PersistentFlags().StringVarP(&workDir, "workDir", "w", "", "work directory") + CLI.PersistentFlags().StringVarP(&mergeJson, "jsonConfig", "j", "", "merge json configuration") CLI.PersistentFlags().StringArrayVarP( &override, @@ -89,7 +91,7 @@ func init() { configureLogger(logFormat) - cfg, err := util.LoadConfig(cfgFile, []string{"."}, ".rr", override) + cfg, err := util.LoadConfig(cfgFile, []string{"."}, ".rr", override, mergeJson) if err != nil { Logger.Warnf("config: %s", err) return diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go index c754be46..1e8f53b2 100644 --- a/cmd/rr/cmd/serve.go +++ b/cmd/rr/cmd/serve.go @@ -36,7 +36,7 @@ func init() { RunE: serveHandler, }) - signal.Notify(stopSignal, os.Interrupt, os.Kill, syscall.SIGTERM, syscall.SIGINT) + signal.Notify(stopSignal, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) } func serveHandler(cmd *cobra.Command, args []string) error { diff --git a/cmd/util/config.go b/cmd/util/config.go index 0a8d6005..08e01a89 100644 --- a/cmd/util/config.go +++ b/cmd/util/config.go @@ -1,6 +1,7 @@ package util import ( + "bytes" "fmt" "github.com/spf13/viper" "github.com/spiral/roadrunner/service" @@ -30,7 +31,7 @@ func (w *ConfigWrapper) Unmarshal(out interface{}) error { } // LoadConfig config and merge it's values with set of flags. -func LoadConfig(cfgFile string, path []string, name string, flags []string) (*ConfigWrapper, error) { +func LoadConfig(cfgFile string, path []string, name string, flags []string, jsonConfig string) (*ConfigWrapper, error) { cfg := viper.New() if cfgFile != "" { @@ -68,14 +69,13 @@ func LoadConfig(cfgFile string, path []string, name string, flags []string) (*Co // If a cfg file is found, read it in. if err := cfg.ReadInConfig(); err != nil { - if len(flags) == 0 { + if len(flags) == 0 && jsonConfig == "" { return nil, err } } // merge included configs if include, ok := cfg.Get("include").([]interface{}); ok { - for _, file := range include { filename, ok := file.(string) if !ok { @@ -117,6 +117,23 @@ func LoadConfig(cfgFile string, path []string, name string, flags []string) (*Co } } + if jsonConfig != "" { + jConfig := viper.New() + jConfig.AutomaticEnv() + jConfig.SetEnvPrefix("rr") + jConfig.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + + jConfig.SetConfigType("json") + if err := jConfig.ReadConfig(bytes.NewBufferString(jsonConfig)); err != nil { + return nil, err + } + + // merging + if err := cfg.MergeConfigMap(jConfig.AllSettings()); err != nil { + return nil, err + } + } + merged := viper.New() // we have to copy all the merged values into new config in order normalize it (viper bug?) diff --git a/error_buffer_test.go b/error_buffer_test.go index 81107935..c163ea43 100644 --- a/error_buffer_test.go +++ b/error_buffer_test.go @@ -7,16 +7,29 @@ import ( func TestErrBuffer_Write_Len(t *testing.T) { buf := newErrBuffer() - defer buf.Close() - - buf.Write([]byte("hello")) + defer func() { + err := buf.Close() + if err != nil { + t.Errorf("error during closing the buffer: error %v", err) + } + }() + + _, err := buf.Write([]byte("hello")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } assert.Equal(t, 5, buf.Len()) assert.Equal(t, "hello", buf.String()) } func TestErrBuffer_Write_Event(t *testing.T) { buf := newErrBuffer() - defer buf.Close() + defer func() { + err := buf.Close() + if err != nil { + t.Errorf("error during closing the buffer: error %v", err) + } + }() tr := make(chan interface{}) buf.Listen(func(event int, ctx interface{}) { @@ -25,8 +38,10 @@ func TestErrBuffer_Write_Event(t *testing.T) { close(tr) }) - buf.Write([]byte("hello\n")) - + _, err := buf.Write([]byte("hello\n")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } <-tr // messages are read @@ -35,7 +50,12 @@ func TestErrBuffer_Write_Event(t *testing.T) { func TestErrBuffer_Write_Event_Separated(t *testing.T) { buf := newErrBuffer() - defer buf.Close() + defer func() { + err := buf.Close() + if err != nil { + t.Errorf("error during closing the buffer: error %v", err) + } + }() tr := make(chan interface{}) buf.Listen(func(event int, ctx interface{}) { @@ -44,9 +64,20 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) { close(tr) }) - buf.Write([]byte("hel")) - buf.Write([]byte("lo\n")) - buf.Write([]byte("ending")) + _, err := buf.Write([]byte("hel")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } + + _, err = buf.Write([]byte("lo\n")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } + + _, err = buf.Write([]byte("ending")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } <-tr assert.Equal(t, 0, buf.Len()) @@ -55,11 +86,27 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) { func TestErrBuffer_Write_Event_Separated_NoListener(t *testing.T) { buf := newErrBuffer() - defer buf.Close() - - buf.Write([]byte("hel")) - buf.Write([]byte("lo\n")) - buf.Write([]byte("ending")) + defer func() { + err := buf.Close() + if err != nil { + t.Errorf("error during closing the buffer: error %v", err) + } + }() + + _, err := buf.Write([]byte("hel")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } + + _, err = buf.Write([]byte("lo\n")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } + + _, err = buf.Write([]byte("ending")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } assert.Equal(t, 12, buf.Len()) assert.Equal(t, "hello\nending", buf.String()) @@ -67,9 +114,17 @@ func TestErrBuffer_Write_Event_Separated_NoListener(t *testing.T) { func TestErrBuffer_Write_Remaining(t *testing.T) { buf := newErrBuffer() - defer buf.Close() - - buf.Write([]byte("hel")) + defer func() { + err := buf.Close() + if err != nil { + t.Errorf("error during closing the buffer: error %v", err) + } + }() + + _, err := buf.Write([]byte("hel")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } assert.Equal(t, 3, buf.Len()) assert.Equal(t, "hel", buf.String()) diff --git a/pipe_factory.go b/pipe_factory.go index d6fe0420..d8243d28 100644 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -1,6 +1,7 @@ package roadrunner import ( + "fmt" "github.com/pkg/errors" "github.com/spiral/goridge" "io" @@ -45,11 +46,20 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { } if pid, err := fetchPID(w.rl); pid != *w.Pid { - go func(w *Worker) { w.Kill() }(w) + go func(w *Worker) { + err := w.Kill() + if err != nil { + // there is no logger here, how to handle error in goroutines ? + fmt.Println(fmt.Sprintf("error killing the worker with PID number %d, Created: %s", w.Pid, w.Created)) + } + }(w) if wErr := w.Wait(); wErr != nil { if _, ok := wErr.(*exec.ExitError); ok { - err = errors.Wrap(wErr, err.Error()) + // error might be nil here + if err != nil { + err = errors.Wrap(wErr, err.Error()) + } } else { err = wErr } diff --git a/pipe_factory_test.go b/pipe_factory_test.go index 9d50e47f..27d1f74d 100644 --- a/pipe_factory_test.go +++ b/pipe_factory_test.go @@ -17,12 +17,15 @@ func Test_Pipe_Start(t *testing.T) { assert.NoError(t, w.Wait()) }() - w.Stop() + assert.NoError(t, w.Stop()) } func Test_Pipe_StartError(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - cmd.Start() + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } w, err := NewPipeFactory().SpawnWorker(cmd) assert.Error(t, err) @@ -31,7 +34,10 @@ func Test_Pipe_StartError(t *testing.T) { func Test_Pipe_PipeError(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - cmd.StdinPipe() + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } w, err := NewPipeFactory().SpawnWorker(cmd) assert.Error(t, err) @@ -40,7 +46,10 @@ func Test_Pipe_PipeError(t *testing.T) { func Test_Pipe_PipeError2(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - cmd.StdoutPipe() + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } w, err := NewPipeFactory().SpawnWorker(cmd) assert.Error(t, err) @@ -71,7 +80,12 @@ func Test_Pipe_Echo(t *testing.T) { go func() { assert.NoError(t, w.Wait()) }() - defer w.Stop() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } + }() res, err := w.Exec(&Payload{Body: []byte("hello")}) @@ -93,7 +107,10 @@ func Test_Pipe_Broken(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "undefined_function()") }() - defer w.Stop() + defer func() { + err := w.Stop() + assert.Error(t, err) + }() res, err := w.Exec(&Payload{Body: []byte("hello")}) @@ -112,7 +129,10 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { } }() - w.Stop() + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } } } @@ -121,9 +141,17 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { w, _ := NewPipeFactory().SpawnWorker(cmd) go func() { - w.Wait() + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } }() - defer w.Stop() for n := 0; n < b.N; n++ { if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil { diff --git a/protocol.go b/protocol.go index 5523a3e5..42649264 100644 --- a/protocol.go +++ b/protocol.go @@ -34,6 +34,9 @@ func fetchPID(rl goridge.Relay) (pid int, err error) { } body, p, err := rl.Receive() + if err != nil { + return 0, err + } if !p.HasFlag(goridge.PayloadControl) { return 0, fmt.Errorf("unexpected response, header is missing") } diff --git a/server_config.go b/server_config.go index 35965962..5403ff01 100644 --- a/server_config.go +++ b/server_config.go @@ -8,15 +8,22 @@ import ( "os" "os/exec" "strings" + "sync" "syscall" "time" ) +// CommandProducer can produce commands. +type CommandProducer func(cfg *ServerConfig) func() *exec.Cmd + // ServerConfig config combines factory, pool and cmd configurations. type ServerConfig struct { // Command includes command strings with all the parameters, example: "php worker.php pipes". Command string + // CommandProducer overwrites + CommandProducer CommandProducer + // Relay defines connection method and factory to be used to connect to workers: // "pipes", "tcp://:6001", "unix://rr.sock" // This config section must not change on re-configuration. @@ -31,7 +38,8 @@ type ServerConfig struct { Pool *Config // values defines set of values to be passed to the command context. - env []string + mu sync.Mutex + env map[string]string } // InitDefaults sets missing values to their default values. @@ -68,18 +76,42 @@ func (cfg *ServerConfig) Differs(new *ServerConfig) bool { // SetEnv sets new environment variable. Value is automatically uppercase-d. func (cfg *ServerConfig) SetEnv(k, v string) { - cfg.env = append(cfg.env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + cfg.mu.Lock() + defer cfg.mu.Unlock() + + if cfg.env == nil { + cfg.env = make(map[string]string) + } + + cfg.env[k] = v +} + +// GetEnv must return list of env variables. +func (cfg *ServerConfig) GetEnv() (env []string) { + env = append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", cfg.Relay)) + for k, v := range cfg.env { + env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + } + + return } // makeCommands returns new command provider based on configured options. func (cfg *ServerConfig) makeCommand() func() *exec.Cmd { + cfg.mu.Lock() + defer cfg.mu.Unlock() + + if cfg.CommandProducer != nil { + return cfg.CommandProducer(cfg) + } + var cmd = strings.Split(cfg.Command, " ") return func() *exec.Cmd { cmd := exec.Command(cmd[0], cmd[1:]...) osutil.IsolateProcess(cmd) - cmd.Env = append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", cfg.Relay)) - cmd.Env = append(cmd.Env, cfg.env...) + cmd.Env = cfg.GetEnv() + return cmd } } @@ -95,8 +127,11 @@ func (cfg *ServerConfig) makeFactory() (Factory, error) { return nil, errors.New("invalid relay DSN (pipes, tcp://:6001, unix://rr.sock)") } - if dsn[0] == "unix" { - syscall.Unlink(dsn[1]) + if dsn[0] == "unix" && fileExists(dsn[1]) { + err := syscall.Unlink(dsn[1]) + if err != nil { + return nil, err + } } ln, err := net.Listen(dsn[0], dsn[1]) @@ -106,3 +141,13 @@ func (cfg *ServerConfig) makeFactory() (Factory, error) { return NewSocketFactory(ln, cfg.RelayTimeout), nil } + +// fileExists checks if a file exists and is not a directory before we +// try using it to prevent further errors. +func fileExists(filename string) bool { + info, err := os.Stat(filename) + if os.IsNotExist(err) { + return false + } + return !info.IsDir() +} diff --git a/server_config_test.go b/server_config_test.go index 753da599..303eba90 100644 --- a/server_config_test.go +++ b/server_config_test.go @@ -18,7 +18,12 @@ func Test_ServerConfig_PipeFactory(t *testing.T) { f, err = cfg.makeFactory() assert.NoError(t, err) assert.NotNil(t, f) - defer f.Close() + defer func() { + err := f.Close() + if err != nil { + t.Errorf("error closing factory or underlying connections: error %v", err) + } + }() assert.NoError(t, err) assert.IsType(t, &PipeFactory{}, f) @@ -26,21 +31,32 @@ func Test_ServerConfig_PipeFactory(t *testing.T) { func Test_ServerConfig_SocketFactory(t *testing.T) { cfg := &ServerConfig{Relay: "tcp://:9111"} - f, err := cfg.makeFactory() + f1, err := cfg.makeFactory() assert.NoError(t, err) - assert.NotNil(t, f) - defer f.Close() + assert.NotNil(t, f1) + defer func() { + err := f1.Close() + + if err != nil { + t.Errorf("error closing factory or underlying connections: error %v", err) + } + }() assert.NoError(t, err) - assert.IsType(t, &SocketFactory{}, f) - assert.Equal(t, "tcp", f.(*SocketFactory).ls.Addr().Network()) - assert.Equal(t, "[::]:9111", f.(*SocketFactory).ls.Addr().String()) + assert.IsType(t, &SocketFactory{}, f1) + assert.Equal(t, "tcp", f1.(*SocketFactory).ls.Addr().Network()) + assert.Equal(t, "[::]:9111", f1.(*SocketFactory).ls.Addr().String()) cfg = &ServerConfig{Relay: "tcp://localhost:9112"} - f, err = cfg.makeFactory() + f, err := cfg.makeFactory() assert.NoError(t, err) assert.NotNil(t, f) - defer f.Close() + defer func() { + err := f.Close() + if err != nil { + t.Errorf("error closing factory or underlying connections: error %v", err) + } + }() assert.NoError(t, err) assert.IsType(t, &SocketFactory{}, f) @@ -55,7 +71,16 @@ func Test_ServerConfig_UnixSocketFactory(t *testing.T) { cfg := &ServerConfig{Relay: "unix://unix.sock"} f, err := cfg.makeFactory() - defer f.Close() + if err != nil { + t.Error(err) + } + + defer func() { + err := f.Close() + if err != nil { + t.Errorf("error closing factory or underlying connections: error %v", err) + } + }() assert.NoError(t, err) assert.IsType(t, &SocketFactory{}, f) @@ -131,7 +156,10 @@ func Test_ServerConfigDefaults(t *testing.T) { Command: "php tests/client.php pipes", } - cfg.InitDefaults() + err := cfg.InitDefaults() + if err != nil { + t.Errorf("error during the InitDefaults: error %v", err) + } assert.Equal(t, "pipes", cfg.Relay) assert.Equal(t, time.Minute, cfg.Pool.AllocateTimeout) diff --git a/server_test.go b/server_test.go index c973d634..9ab480b1 100644 --- a/server_test.go +++ b/server_test.go @@ -205,7 +205,10 @@ func TestServer_ReplacePool(t *testing.T) { } }) - rr.Reset() + err := rr.Reset() + if err != nil { + t.Errorf("error resetting the pool: error %v", err) + } <-constructed for _, w := range rr.Workers() { @@ -239,9 +242,11 @@ func TestServer_ServerFailure(t *testing.T) { rr.pool.(*StaticPool).cmd = func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "broken-connection") } - // killing random worker and expecting pool to replace it - rr.Workers()[0].cmd.Process.Kill() + err := rr.Workers()[0].cmd.Process.Kill() + if err != nil { + t.Errorf("error killing the process: error %v", err) + } <-failure assert.True(t, true) diff --git a/service/container.go b/service/container.go index a21b49b4..742b4c3b 100644 --- a/service/container.go +++ b/service/container.go @@ -46,6 +46,9 @@ type Container interface { // Close all active services. Stop() + + // List service names. + List() []string } // Config provides ability to slice configuration sections and unmarshal configuration data into @@ -212,6 +215,16 @@ func (c *container) Stop() { } } +// List all service names. +func (c *container) List() []string { + names := make([]string, 0, len(c.services)) + for _, e := range c.services { + names = append(names, e.name) + } + + return names +} + // calls Init method with automatically resolved arguments. func (c *container) initService(s interface{}, segment Config) (bool, error) { r := reflect.TypeOf(s) @@ -263,7 +276,10 @@ func (c *container) resolveValues(s interface{}, m reflect.Method, cfg Config) ( sc := reflect.New(v.Elem()) if dsc, ok := sc.Interface().(DefaultsConfig); ok { - dsc.InitDefaults() + err := dsc.InitDefaults() + if err != nil { + return nil, err + } if cfg == nil { values = append(values, sc) continue diff --git a/service/container_test.go b/service/container_test.go index 8fcaede2..ad4c1e64 100644 --- a/service/container_test.go +++ b/service/container_test.go @@ -67,7 +67,10 @@ type testCfg struct{ cfg string } func (cfg *testCfg) Get(name string) Config { vars := make(map[string]interface{}) - json.Unmarshal([]byte(cfg.cfg), &vars) + err := json.Unmarshal([]byte(cfg.cfg), &vars) + if err != nil { + panic("error unmarshalling the cfg.cfg value") + } v, ok := vars[name] if !ok { @@ -129,6 +132,20 @@ func TestContainer_Has(t *testing.T) { assert.False(t, c.Has("another")) } +func TestContainer_List(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testService{}) + + assert.Equal(t, 0, len(hook.Entries)) + assert.Equal(t, 1, len(c.List())) + + assert.True(t, c.Has("test")) + assert.False(t, c.Has("another")) +} + func TestContainer_Get(t *testing.T) { logger, hook := test.NewNullLogger() logger.SetLevel(logrus.DebugLevel) @@ -425,6 +442,10 @@ func TestContainer_InitErrorB(t *testing.T) { type testInitC struct{} +func (r *testInitC) Test() bool { + return true +} + func TestContainer_NoInit(t *testing.T) { logger, _ := test.NewNullLogger() logger.SetLevel(logrus.DebugLevel) @@ -448,7 +469,6 @@ func (c *DCfg) Hydrate(cfg Config) error { if err := cfg.Unmarshal(c); err != nil { return err } - if c.V == "fail" { return errors.New("failed config") } diff --git a/service/env/config_test.go b/service/env/config_test.go index 50fbdaa5..226712c3 100644 --- a/service/env/config_test.go +++ b/service/env/config_test.go @@ -30,6 +30,9 @@ func Test_Config_Hydrate_Empty(t *testing.T) { func Test_Config_Defaults(t *testing.T) { c := &Config{} - c.InitDefaults() + err := c.InitDefaults() + if err != nil { + t.Errorf("Test_Config_Defaults failed: error %v", err) + } assert.Len(t, c.Values, 0) } diff --git a/service/env/service_test.go b/service/env/service_test.go index c20bb76c..19cc03c7 100644 --- a/service/env/service_test.go +++ b/service/env/service_test.go @@ -11,8 +11,12 @@ func Test_NewService(t *testing.T) { } func Test_Init(t *testing.T) { + var err error s := &Service{} - s.Init(&Config{}) + _, err = s.Init(&Config{}) + if err != nil { + t.Errorf("error during the s.Init: error %v", err) + } assert.Len(t, s.values, 1) values, err := s.GetEnv() @@ -21,9 +25,13 @@ func Test_Init(t *testing.T) { } func Test_Extend(t *testing.T) { + var err error s := NewService(map[string]string{"RR": "version"}) - s.Init(&Config{Values: map[string]string{"key": "value"}}) + _, err = s.Init(&Config{Values: map[string]string{"key": "value"}}) + if err != nil { + t.Errorf("error during the s.Init: error %v", err) + } assert.Len(t, s.values, 2) values, err := s.GetEnv() @@ -34,9 +42,13 @@ func Test_Extend(t *testing.T) { } func Test_Set(t *testing.T) { + var err error s := NewService(map[string]string{"RR": "version"}) - s.Init(&Config{Values: map[string]string{"key": "value"}}) + _, err = s.Init(&Config{Values: map[string]string{"key": "value"}}) + if err != nil { + t.Errorf("error during the s.Init: error %v", err) + } assert.Len(t, s.values, 2) s.SetEnv("key", "value-new") diff --git a/service/headers/service_test.go b/service/headers/service_test.go index 250f4458..2f29db5e 100644 --- a/service/headers/service_test.go +++ b/service/headers/service_test.go @@ -59,7 +59,12 @@ func Test_RequestHeaders(t *testing.T) { } }`})) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -68,7 +73,12 @@ func Test_RequestHeaders(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the body closing: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -103,7 +113,12 @@ func Test_ResponseHeaders(t *testing.T) { } }`})) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -112,7 +127,12 @@ func Test_ResponseHeaders(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the body closing: error %v", err) + } + }() assert.Equal(t, "output-header", r.Header.Get("output")) @@ -157,7 +177,12 @@ func TestCORS_OPTIONS(t *testing.T) { } }`})) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -166,7 +191,12 @@ func TestCORS_OPTIONS(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the body closing: error %v", err) + } + }() assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials")) assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Headers")) @@ -215,7 +245,12 @@ func TestCORS_Pass(t *testing.T) { } }`})) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -224,7 +259,12 @@ func TestCORS_Pass(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the body closing: error %v", err) + } + }() assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials")) assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Headers")) diff --git a/service/health/service.go b/service/health/service.go index c0be68e0..c82f43b5 100644 --- a/service/health/service.go +++ b/service/health/service.go @@ -2,6 +2,8 @@ package health import ( "context" + "fmt" + "github.com/sirupsen/logrus" "net/http" "sync" @@ -14,19 +16,21 @@ const ID = "health" // Service to serve an endpoint for checking the health of the worker pool type Service struct { cfg *Config + log *logrus.Logger mu sync.Mutex http *http.Server httpService *rrhttp.Service } // Init health service -func (s *Service) Init(cfg *Config, r *rrhttp.Service) (bool, error) { +func (s *Service) Init(cfg *Config, r *rrhttp.Service, log *logrus.Logger) (bool, error) { // Ensure the httpService is set if r == nil { return false, nil } s.cfg = cfg + s.log = log s.httpService = r return true, nil } @@ -37,7 +41,13 @@ func (s *Service) Serve() error { s.mu.Lock() s.http = &http.Server{Addr: s.cfg.Address, Handler: s} s.mu.Unlock() - return s.http.ListenAndServe() + + err := s.http.ListenAndServe() + if err == nil || err == http.ErrServerClosed { + return nil + } + + return err } // Stop the health endpoint @@ -47,7 +57,12 @@ func (s *Service) Stop() { if s.http != nil { // gracefully stop the server - go s.http.Shutdown(context.Background()) + go func() { + err := s.http.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error(fmt.Errorf("error shutting down the metrics server: error %v", err)) + } + }() } } diff --git a/service/health/service_test.go b/service/health/service_test.go index 76462df9..e8685548 100644 --- a/service/health/service_test.go +++ b/service/health/service_test.go @@ -66,7 +66,12 @@ func TestService_Serve(t *testing.T) { assert.NotNil(t, hS) assert.Equal(t, service.StatusOK, httpStatus) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 500) defer c.Stop() @@ -85,10 +90,10 @@ func TestService_Serve_DeadWorker(t *testing.T) { assert.NoError(t, c.Init(&testCfg{ healthCfg: `{ - "address": "localhost:2116" + "address": "localhost:2117" }`, httpCfg: `{ - "address": "localhost:2115", + "address": "localhost:2118", "workers":{ "command": "php ../../tests/http/slow-client.php echo pipes 1000", "pool": {"numWorkers": 1} @@ -104,16 +109,24 @@ func TestService_Serve_DeadWorker(t *testing.T) { assert.NotNil(t, hS) assert.Equal(t, service.StatusOK, httpStatus) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("server error: %v", err) + } + }() time.Sleep(time.Millisecond * 500) defer c.Stop() // Kill the worker httpSvc := hS.(*rrhttp.Service) - httpSvc.Server().Workers()[0].Kill() + err := httpSvc.Server().Workers()[0].Kill() + if err != nil { + t.Errorf("error killing the worker: error %v", err) + } // Check health check - _, res, err := get("http://localhost:2116/") + _, res, err := get("http://localhost:2117/") assert.NoError(t, err) assert.Equal(t, http.StatusInternalServerError, res.StatusCode) } @@ -128,10 +141,10 @@ func TestService_Serve_DeadWorkerStillHealthy(t *testing.T) { assert.NoError(t, c.Init(&testCfg{ healthCfg: `{ - "address": "localhost:2116" + "address": "localhost:2119" }`, httpCfg: `{ - "address": "localhost:2115", + "address": "localhost:2120", "workers":{ "command": "php ../../tests/http/client.php echo pipes", "pool": {"numWorkers": 2} @@ -147,16 +160,24 @@ func TestService_Serve_DeadWorkerStillHealthy(t *testing.T) { assert.NotNil(t, hS) assert.Equal(t, service.StatusOK, httpStatus) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Second * 1) defer c.Stop() // Kill one of the workers httpSvc := hS.(*rrhttp.Service) - httpSvc.Server().Workers()[0].Kill() + err := httpSvc.Server().Workers()[0].Kill() + if err != nil { + t.Errorf("error killing the worker: error %v", err) + } // Check health check - _, res, err := get("http://localhost:2116/") + _, res, err := get("http://localhost:2119/") assert.NoError(t, err) assert.Equal(t, http.StatusOK, res.StatusCode) } @@ -170,7 +191,7 @@ func TestService_Serve_NoHTTPService(t *testing.T) { assert.NoError(t, c.Init(&testCfg{ healthCfg: `{ - "address": "localhost:2116" + "address": "localhost:2121" }`, })) @@ -191,10 +212,10 @@ func TestService_Serve_NoServer(t *testing.T) { assert.NoError(t, c.Init(&testCfg{ healthCfg: `{ - "address": "localhost:2116" + "address": "localhost:2122" }`, httpCfg: `{ - "address": "localhost:2115", + "address": "localhost:2123", "workers":{ "command": "php ../../tests/http/client.php echo pipes", "pool": {"numWorkers": 1} @@ -210,14 +231,19 @@ func TestService_Serve_NoServer(t *testing.T) { assert.NotNil(t, hS) assert.Equal(t, service.StatusOK, httpStatus) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 500) defer c.Stop() // Set the httpService to nil healthSvc.httpService = nil - _, res, err := get("http://localhost:2116/") + _, res, err := get("http://localhost:2122/") assert.NoError(t, err) assert.Equal(t, http.StatusInternalServerError, res.StatusCode) } @@ -234,10 +260,10 @@ func TestService_Serve_NoPool(t *testing.T) { assert.NoError(t, c.Init(&testCfg{ healthCfg: `{ - "address": "localhost:2116" + "address": "localhost:2124" }`, httpCfg: `{ - "address": "localhost:2115", + "address": "localhost:2125", "workers":{ "command": "php ../../tests/http/client.php echo pipes", "pool": {"numWorkers": 1} @@ -253,14 +279,19 @@ func TestService_Serve_NoPool(t *testing.T) { assert.NotNil(t, hS) assert.Equal(t, service.StatusOK, httpStatus) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 500) defer c.Stop() // Stop the pool httpSvc.Server().Stop() - _, res, err := get("http://localhost:2116/") + _, res, err := get("http://localhost:2124/") assert.NoError(t, err) assert.Equal(t, http.StatusInternalServerError, res.StatusCode) } @@ -271,8 +302,15 @@ func get(url string) (string, *http.Response, error) { if err != nil { return "", nil, err } - defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", nil, err + } + + err = r.Body.Close() + if err != nil { + return "", nil, err + } return string(b), r, err } diff --git a/service/http/attributes/attributes_test.go b/service/http/attributes/attributes_test.go index a71d6542..2360fd12 100644 --- a/service/http/attributes/attributes_test.go +++ b/service/http/attributes/attributes_test.go @@ -10,7 +10,10 @@ func TestAllAttributes(t *testing.T) { r := &http.Request{} r = Init(r) - Set(r, "key", "value") + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } assert.Equal(t, All(r), map[string]interface{}{ "key": "value", @@ -34,7 +37,10 @@ func TestGetAttribute(t *testing.T) { r := &http.Request{} r = Init(r) - Set(r, "key", "value") + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } assert.Equal(t, Get(r, "key"), "value") } @@ -55,13 +61,19 @@ func TestSetAttribute(t *testing.T) { r := &http.Request{} r = Init(r) - Set(r, "key", "value") + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } assert.Equal(t, Get(r, "key"), "value") } func TestSetAttributeNone(t *testing.T) { r := &http.Request{} - Set(r, "key", "value") + err := Set(r, "key", "value") + if err != nil { + t.Errorf("error during the Set: error %v", err) + } assert.Equal(t, Get(r, "key"), nil) } diff --git a/service/http/config.go b/service/http/config.go index ba3e6300..13a2cfc9 100644 --- a/service/http/config.go +++ b/service/http/config.go @@ -126,9 +126,18 @@ func (c *Config) Hydrate(cfg service.Config) error { c.SSL.Port = 443 } - c.HTTP2.InitDefaults() - c.Uploads.InitDefaults() - c.Workers.InitDefaults() + err := c.HTTP2.InitDefaults() + if err != nil { + return err + } + err = c.Uploads.InitDefaults() + if err != nil { + return err + } + err = c.Workers.InitDefaults() + if err != nil { + return err + } if err := cfg.Unmarshal(c); err != nil { return err diff --git a/service/http/h2c_test.go b/service/http/h2c_test.go index d806e5ff..7bbc30ac 100644 --- a/service/http/h2c_test.go +++ b/service/http/h2c_test.go @@ -36,7 +36,12 @@ func Test_Service_H2C(t *testing.T) { // should do nothing s.(*Service).Stop() - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error serving: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -49,7 +54,12 @@ func Test_Service_H2C(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("fail to close the Body: error %v", err) + } + }() assert.Equal(t, "101 Switching Protocols", r.Status) diff --git a/service/http/handler.go b/service/http/handler.go index a4da224d..3c667035 100644 --- a/service/http/handler.go +++ b/service/http/handler.go @@ -2,6 +2,7 @@ package http import ( "github.com/pkg/errors" + "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" "net" "net/http" @@ -59,6 +60,7 @@ func (e *ResponseEvent) Elapsed() time.Duration { // parsed files and query, payload will include parsed form dataTree (if any). type Handler struct { cfg *Config + log *logrus.Logger rr *roadrunner.Server mul sync.Mutex lsn func(event int, ctx interface{}) @@ -98,8 +100,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // proxy IP resolution h.resolveIP(req) - req.Open() - defer req.Close() + req.Open(h.log) + defer req.Close(h.log) p, err := req.Payload() if err != nil { @@ -120,7 +122,10 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } h.handleResponse(req, resp, start) - resp.Write(w) + err = resp.Write(w) + if err != nil { + h.handleError(w, r, err, start) + } } // handleError sends error. @@ -128,7 +133,10 @@ func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error, h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) w.WriteHeader(500) - w.Write([]byte(err.Error())) + _, err = w.Write([]byte(err.Error())) + if err != nil { + h.throw(EventError, &ErrorEvent{Request: r, Error: err, start: start, elapsed: time.Since(start)}) + } } // handleResponse triggers response event. diff --git a/service/http/handler_test.go b/service/http/handler_test.go index e29b76ac..994a663c 100644 --- a/service/http/handler_test.go +++ b/service/http/handler_test.go @@ -23,9 +23,15 @@ func get(url string) (string, *http.Response, error) { if err != nil { return "", nil, err } - defer r.Body.Close() - b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", nil, err + } + + err = r.Body.Close() + if err != nil { + return "", nil, err + } return string(b), r, err } @@ -44,9 +50,16 @@ func getHeader(url string, h map[string]string) (string, *http.Response, error) if err != nil { return "", nil, err } - defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", nil, err + } + + err = r.Body.Close() + if err != nil { + return "", nil, err + } return string(b), r, err } @@ -74,9 +87,19 @@ func TestHandler_Echo(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) body, r, err := get("http://localhost:8177/?hello=world") @@ -165,10 +188,20 @@ func TestHandler_Headers(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8078", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() - time.Sleep(time.Millisecond * 10) + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() + time.Sleep(time.Millisecond * 100) req, err := http.NewRequest("GET", "http://localhost:8078?hello=world", nil) assert.NoError(t, err) @@ -177,7 +210,13 @@ func TestHandler_Headers(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -212,9 +251,19 @@ func TestHandler_Empty_User_Agent(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8088", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) @@ -224,7 +273,13 @@ func TestHandler_Empty_User_Agent(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -258,9 +313,19 @@ func TestHandler_User_Agent(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8088", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) req, err := http.NewRequest("GET", "http://localhost:8088?hello=world", nil) @@ -270,7 +335,13 @@ func TestHandler_User_Agent(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -304,9 +375,19 @@ func TestHandler_Cookies(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8079", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) req, err := http.NewRequest("GET", "http://localhost:8079", nil) @@ -316,7 +397,13 @@ func TestHandler_Cookies(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -355,9 +442,19 @@ func TestHandler_JsonPayload_POST(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8090", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) req, err := http.NewRequest( @@ -371,7 +468,13 @@ func TestHandler_JsonPayload_POST(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -405,9 +508,19 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8081", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) @@ -417,7 +530,12 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -451,9 +569,19 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8082", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, bytes.NewBufferString(`{"key":"value"}`)) @@ -463,7 +591,13 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -497,9 +631,19 @@ func TestHandler_FormData_POST(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8083", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) form := url.Values{} @@ -520,7 +664,13 @@ func TestHandler_FormData_POST(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -555,9 +705,19 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8083", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) form := url.Values{} @@ -572,7 +732,13 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -607,9 +773,19 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8083", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) form := url.Values{} @@ -630,7 +806,13 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -665,9 +847,19 @@ func TestHandler_FormData_PUT(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8084", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) form := url.Values{} @@ -688,7 +880,13 @@ func TestHandler_FormData_PUT(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -723,9 +921,19 @@ func TestHandler_FormData_PATCH(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8085", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) form := url.Values{} @@ -746,7 +954,13 @@ func TestHandler_FormData_PATCH(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -781,25 +995,73 @@ func TestHandler_Multipart_POST(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8019", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) var mb bytes.Buffer w := multipart.NewWriter(&mb) - w.WriteField("key", "value") + err := w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name1") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } - w.WriteField("key", "value") - w.WriteField("name[]", "name1") - w.WriteField("name[]", "name2") - w.WriteField("name[]", "name3") - w.WriteField("arr[x][y][z]", "y") - w.WriteField("arr[x][y][e]", "f") - w.WriteField("arr[c]p", "l") - w.WriteField("arr[c]z", "") + err = w.WriteField("name[]", "name2") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name3") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[x][y][z]", "y") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } - w.Close() + err = w.WriteField("arr[x][y][e]", "f") + + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]p", "l") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]z", "") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.Close() + if err != nil { + t.Errorf("error closing the writer: error %v", err) + } req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) assert.NoError(t, err) @@ -808,7 +1070,13 @@ func TestHandler_Multipart_POST(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -843,25 +1111,73 @@ func TestHandler_Multipart_PUT(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8020", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) var mb bytes.Buffer w := multipart.NewWriter(&mb) - w.WriteField("key", "value") + err := w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } - w.WriteField("key", "value") - w.WriteField("name[]", "name1") - w.WriteField("name[]", "name2") - w.WriteField("name[]", "name3") - w.WriteField("arr[x][y][z]", "y") - w.WriteField("arr[x][y][e]", "f") - w.WriteField("arr[c]p", "l") - w.WriteField("arr[c]z", "") + err = w.WriteField("name[]", "name1") - w.Close() + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name2") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name3") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[x][y][z]", "y") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[x][y][e]", "f") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]p", "l") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]z", "") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.Close() + if err != nil { + t.Errorf("error closing the writer: error %v", err) + } req, err := http.NewRequest("PUT", "http://localhost"+hs.Addr, &mb) assert.NoError(t, err) @@ -870,7 +1186,13 @@ func TestHandler_Multipart_PUT(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -905,25 +1227,75 @@ func TestHandler_Multipart_PATCH(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8021", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() - go func() { hs.ListenAndServe() }() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) var mb bytes.Buffer w := multipart.NewWriter(&mb) - w.WriteField("key", "value") + err := w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("key", "value") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } - w.WriteField("key", "value") - w.WriteField("name[]", "name1") - w.WriteField("name[]", "name2") - w.WriteField("name[]", "name3") - w.WriteField("arr[x][y][z]", "y") - w.WriteField("arr[x][y][e]", "f") - w.WriteField("arr[c]p", "l") - w.WriteField("arr[c]z", "") + err = w.WriteField("name[]", "name1") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } - w.Close() + err = w.WriteField("name[]", "name2") + + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("name[]", "name3") + + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[x][y][z]", "y") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[x][y][e]", "f") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]p", "l") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.WriteField("arr[c]z", "") + if err != nil { + t.Errorf("error writing the field: error %v", err) + } + + err = w.Close() + if err != nil { + t.Errorf("error closing the writer: error %v", err) + } req, err := http.NewRequest("PATCH", "http://localhost"+hs.Addr, &mb) assert.NoError(t, err) @@ -932,7 +1304,13 @@ func TestHandler_Multipart_PATCH(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -967,9 +1345,19 @@ func TestHandler_Error(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) _, r, err := get("http://localhost:8177/?hello=world") @@ -1001,9 +1389,19 @@ func TestHandler_Error2(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) _, r, err := get("http://localhost:8177/?hello=world") @@ -1035,9 +1433,19 @@ func TestHandler_Error3(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) b2 := &bytes.Buffer{} @@ -1050,7 +1458,13 @@ func TestHandler_Error3(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the closing Body: error %v", err) + + } + }() assert.NoError(t, err) assert.Equal(t, 500, r.StatusCode) @@ -1080,9 +1494,19 @@ func TestHandler_ResponseDuration(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) gotresp := make(chan interface{}) @@ -1129,9 +1553,19 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) gotresp := make(chan interface{}) @@ -1178,9 +1612,19 @@ func TestHandler_ErrorDuration(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) goterr := make(chan interface{}) @@ -1231,15 +1675,28 @@ func TestHandler_IP(t *testing.T) { }), } - h.cfg.parseCIDRs() + err := h.cfg.parseCIDRs() + if err != nil { + t.Errorf("error parsing CIDRs: error %v", err) + } assert.NoError(t, h.rr.Start()) defer h.rr.Stop() hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) body, r, err := get("http://127.0.0.1:8177/") @@ -1277,15 +1734,28 @@ func TestHandler_XRealIP(t *testing.T) { }), } - h.cfg.parseCIDRs() + err := h.cfg.parseCIDRs() + if err != nil { + t.Errorf("error parsing CIDRs: error %v", err) + } assert.NoError(t, h.rr.Start()) defer h.rr.Stop() hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{ @@ -1328,15 +1798,27 @@ func TestHandler_XForwardedFor(t *testing.T) { }), } - h.cfg.parseCIDRs() - + err := h.cfg.parseCIDRs() + if err != nil { + t.Errorf("error parsing CIDRs: error %v", err) + } assert.NoError(t, h.rr.Start()) defer h.rr.Stop() hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{ @@ -1379,15 +1861,27 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { }), } - h.cfg.parseCIDRs() - + err := h.cfg.parseCIDRs() + if err != nil { + t.Errorf("error parsing CIDRs: error %v", err) + } assert.NoError(t, h.rr.Start()) defer h.rr.Stop() hs := &http.Server{Addr: "127.0.0.1:8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) body, r, err := getHeader("http://127.0.0.1:8177/", map[string]string{ @@ -1419,13 +1913,26 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { }), } - h.rr.Start() + err := h.rr.Start() + if err != nil { + b.Errorf("error starting the worker pool: error %v", err) + } defer h.rr.Stop() hs := &http.Server{Addr: ":8177", Handler: h} - defer hs.Shutdown(context.Background()) + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + b.Errorf("error during the shutdown: error %v", err) + } + }() - go func() { hs.ListenAndServe() }() + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + b.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) bb := "WORLD" @@ -1434,11 +1941,21 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { if err != nil { b.Fail() } - defer r.Body.Close() - - br, _ := ioutil.ReadAll(r.Body) - if string(br) != bb { - b.Fail() + // Response might be nil here + if r != nil { + br, err := ioutil.ReadAll(r.Body) + if err != nil { + b.Errorf("error reading Body: error %v", err) + } + if string(br) != bb { + b.Fail() + } + err = r.Body.Close() + if err != nil { + b.Errorf("error closing the Body: error %v", err) + } + } else { + b.Errorf("got nil response") } } } diff --git a/service/http/request.go b/service/http/request.go index 98508342..5d91bfb6 100644 --- a/service/http/request.go +++ b/service/http/request.go @@ -3,6 +3,7 @@ package http import ( "encoding/json" "fmt" + "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service/http/attributes" "io/ioutil" @@ -112,21 +113,21 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { } // Open moves all uploaded files to temporary directory so it can be given to php later. -func (r *Request) Open() { +func (r *Request) Open(log *logrus.Logger) { if r.Uploads == nil { return } - r.Uploads.Open() + r.Uploads.Open(log) } // Close clears all temp file uploads -func (r *Request) Close() { +func (r *Request) Close(log *logrus.Logger) { if r.Uploads == nil { return } - r.Uploads.Clear() + r.Uploads.Clear(log) } // Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open diff --git a/service/http/response.go b/service/http/response.go index 166ced82..aafaed13 100644 --- a/service/http/response.go +++ b/service/http/response.go @@ -36,7 +36,10 @@ func (r *Response) Write(w http.ResponseWriter) error { p, h := handlePushHeaders(r.Headers) if pusher, ok := w.(http.Pusher); ok { for _, v := range p { - pusher.Push(v, nil) + err := pusher.Push(v, nil) + if err != nil { + return err + } } } @@ -50,7 +53,10 @@ func (r *Response) Write(w http.ResponseWriter) error { w.WriteHeader(r.Status) if data, ok := r.body.([]byte); ok { - w.Write(data) + _, err := w.Write(data) + if err != nil { + return err + } } if rc, ok := r.body.(io.Reader); ok { diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go index 669b201c..0e4b2c0a 100644 --- a/service/http/rpc_test.go +++ b/service/http/rpc_test.go @@ -49,7 +49,12 @@ func Test_RPC(t *testing.T) { s2, _ := c.Get(rpc.ID) rs := s2.(*rpc.Service) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -110,7 +115,12 @@ func Test_RPC_Unix(t *testing.T) { s2, _ := c.Get(rpc.ID) rs := s2.(*rpc.Service) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -164,7 +174,12 @@ func Test_Workers(t *testing.T) { s2, _ := c.Get(rpc.ID) rs := s2.(*rpc.Service) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() diff --git a/service/http/service.go b/service/http/service.go index 58038acb..fb4b51df 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -2,7 +2,9 @@ package http import ( "context" + "crypto/tls" "fmt" + "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service/env" "github.com/spiral/roadrunner/service/http/attributes" @@ -31,6 +33,8 @@ type middleware func(f http.HandlerFunc) http.HandlerFunc // Service manages rr, http servers. type Service struct { cfg *Config + log *logrus.Logger + cprod roadrunner.CommandProducer env env.Environment lsns []func(event int, ctx interface{}) mdwr []middleware @@ -48,6 +52,11 @@ func (s *Service) Attach(w roadrunner.Controller) { s.controller = w } +// ProduceCommands changes the default command generator method +func (s *Service) ProduceCommands(producer roadrunner.CommandProducer) { + s.cprod = producer +} + // AddMiddleware adds new net/http mdwr. func (s *Service) AddMiddleware(m middleware) { s.mdwr = append(s.mdwr, m) @@ -60,8 +69,9 @@ func (s *Service) AddListener(l func(event int, ctx interface{})) { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment) (bool, error) { +func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment, log *logrus.Logger) (bool, error) { s.cfg = cfg + s.log = log s.env = e if r != nil { @@ -87,6 +97,7 @@ func (s *Service) Serve() error { } } + s.cfg.Workers.CommandProducer = s.cprod s.cfg.Workers.SetEnv("RR_HTTP", "true") s.rr = roadrunner.NewServer(s.cfg.Workers) @@ -132,19 +143,38 @@ func (s *Service) Serve() error { if s.http != nil { go func() { - err <- s.http.ListenAndServe() + httpErr := s.http.ListenAndServe() + if httpErr != nil && httpErr != http.ErrServerClosed { + err <- httpErr + } else { + err <- nil + } }() } if s.https != nil { go func() { - err <- s.https.ListenAndServeTLS(s.cfg.SSL.Cert, s.cfg.SSL.Key) + httpErr := s.https.ListenAndServeTLS( + s.cfg.SSL.Cert, + s.cfg.SSL.Key, + ) + + if httpErr != nil && httpErr != http.ErrServerClosed { + err <- httpErr + } else { + err <- nil + } }() } if s.fcgi != nil { go func() { - err <- s.serveFCGI() + httpErr := s.serveFCGI() + if httpErr != nil && httpErr != http.ErrServerClosed { + err <- httpErr + } else { + err <- nil + } }() } @@ -157,15 +187,35 @@ func (s *Service) Stop() { defer s.mu.Unlock() if s.fcgi != nil { - go s.fcgi.Shutdown(context.Background()) + go func() { + err := s.fcgi.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + // Stop() error + // push error from goroutines to the channel and block unil error or success shutdown or timeout + s.log.Error(fmt.Errorf("error shutting down the fcgi server, error: %v", err)) + return + } + }() } if s.https != nil { - go s.https.Shutdown(context.Background()) + go func() { + err := s.https.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error(fmt.Errorf("error shutting down the https server, error: %v", err)) + return + } + }() } if s.http != nil { - go s.http.Shutdown(context.Background()) + go func() { + err := s.http.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error(fmt.Errorf("error shutting down the http server, error: %v", err)) + return + } + }() } } @@ -191,6 +241,10 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + if s.https != nil && r.TLS != nil { + w.Header().Add("Strict-Transport-Security", "max-age=63072000; includeSubDomains") + } + r = attributes.Init(r) // chaining middleware @@ -203,7 +257,13 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Init https server. func (s *Service) initSSL() *http.Server { - server := &http.Server{Addr: s.tlsAddr(s.cfg.Address, true), Handler: s} + server := &http.Server{ + Addr: s.tlsAddr(s.cfg.Address, true), + Handler: s, + TLSConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + }, + } s.throw(EventInitSSL, server) return server diff --git a/service/http/service_test.go b/service/http/service_test.go index 69cb7003..c4b2c2c4 100644 --- a/service/http/service_test.go +++ b/service/http/service_test.go @@ -53,7 +53,8 @@ func Test_Service_NoConfig(t *testing.T) { c := service.NewContainer(logger) c.Register(ID, &Service{}) - c.Init(&testCfg{httpCfg: `{"Enable":true}`}) + err := c.Init(&testCfg{httpCfg: `{"Enable":true}`}) + assert.Error(t, err) s, st := c.Get(ID) assert.NotNil(t, s) @@ -138,7 +139,12 @@ func Test_Service_Echo(t *testing.T) { // should do nothing s.(*Service).Stop() - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -147,7 +153,12 @@ func Test_Service_Echo(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -191,7 +202,12 @@ func Test_Service_Env(t *testing.T) { // should do nothing s.(*Service).Stop() - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -200,7 +216,12 @@ func Test_Service_Env(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -249,7 +270,12 @@ func Test_Service_ErrorEcho(t *testing.T) { } }) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -258,7 +284,12 @@ func Test_Service_ErrorEcho(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -304,14 +335,22 @@ func Test_Service_Middleware(t *testing.T) { return func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/halt" { w.WriteHeader(500) - w.Write([]byte("halted")) + _, err := w.Write([]byte("halted")) + if err != nil { + t.Errorf("error writing the data to the http reply: error %v", err) + } } else { f(w, r) } } }) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -320,7 +359,6 @@ func Test_Service_Middleware(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -329,19 +367,27 @@ func Test_Service_Middleware(t *testing.T) { assert.Equal(t, 201, r.StatusCode) assert.Equal(t, "WORLD", string(b)) + err = r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + req, err = http.NewRequest("GET", "http://localhost:6029/halt", nil) assert.NoError(t, err) r, err = http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() - b, err = ioutil.ReadAll(r.Body) assert.NoError(t, err) assert.NoError(t, err) assert.Equal(t, 500, r.StatusCode) assert.Equal(t, "halted", string(b)) + + err = r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } } func Test_Service_Listener(t *testing.T) { @@ -381,7 +427,12 @@ func Test_Service_Listener(t *testing.T) { } }) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) c.Stop() diff --git a/service/http/ssl_test.go b/service/http/ssl_test.go index 63eb90b1..c9b4d090 100644 --- a/service/http/ssl_test.go +++ b/service/http/ssl_test.go @@ -47,7 +47,12 @@ func Test_SSL_Service_Echo(t *testing.T) { // should do nothing s.(*Service).Stop() - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -56,7 +61,12 @@ func Test_SSL_Service_Echo(t *testing.T) { r, err := sslClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("fail to close the Body: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -93,7 +103,12 @@ func Test_SSL_Service_NoRedirect(t *testing.T) { // should do nothing s.(*Service).Stop() - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -102,7 +117,12 @@ func Test_SSL_Service_NoRedirect(t *testing.T) { r, err := sslClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("fail to close the Body: error %v", err) + } + }() assert.Nil(t, r.TLS) @@ -142,7 +162,12 @@ func Test_SSL_Service_Redirect(t *testing.T) { // should do nothing s.(*Service).Stop() - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -151,7 +176,12 @@ func Test_SSL_Service_Redirect(t *testing.T) { r, err := sslClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("fail to close the Body: error %v", err) + } + }() assert.NotNil(t, r.TLS) @@ -191,7 +221,12 @@ func Test_SSL_Service_Push(t *testing.T) { // should do nothing s.(*Service).Stop() - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -200,7 +235,12 @@ func Test_SSL_Service_Push(t *testing.T) { r, err := sslClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("fail to close the Body: error %v", err) + } + }() assert.NotNil(t, r.TLS) diff --git a/service/http/uploads.go b/service/http/uploads.go index 7610ab28..8a46f230 100644 --- a/service/http/uploads.go +++ b/service/http/uploads.go @@ -2,6 +2,8 @@ package http import ( "encoding/json" + "fmt" + "github.com/sirupsen/logrus" "io" "io/ioutil" "mime/multipart" @@ -45,13 +47,16 @@ func (u *Uploads) MarshalJSON() ([]byte, error) { // Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors // will be handled individually. -func (u *Uploads) Open() { +func (u *Uploads) Open(log *logrus.Logger) { var wg sync.WaitGroup for _, f := range u.list { wg.Add(1) go func(f *FileUpload) { defer wg.Done() - f.Open(u.cfg) + err := f.Open(u.cfg) + if err != nil && log != nil { + log.Error(fmt.Errorf("error opening the file: error %v", err)) + } }(f) } @@ -59,10 +64,13 @@ func (u *Uploads) Open() { } // Clear deletes all temporary files. -func (u *Uploads) Clear() { +func (u *Uploads) Clear(log *logrus.Logger) { for _, f := range u.list { if f.TempFilename != "" && exists(f.TempFilename) { - os.Remove(f.TempFilename) + err := os.Remove(f.TempFilename) + if err != nil && log != nil { + log.Error(fmt.Errorf("error removing the file: error %v", err)) + } } } } @@ -99,7 +107,13 @@ func NewUpload(f *multipart.FileHeader) *FileUpload { } // Open moves file content into temporary file available for PHP. -func (f *FileUpload) Open(cfg *UploadsConfig) error { +// NOTE: +// There is 2 deferred functions, and in case of getting 2 errors from both functions +// error from close of temp file would be overwritten by error from the main file +// STACK +// DEFER FILE CLOSE (2) +// DEFER TMP CLOSE (1) +func (f *FileUpload) Open(cfg *UploadsConfig) (err error) { if cfg.Forbids(f.Name) { f.Error = UploadErrorExtension return nil @@ -110,7 +124,11 @@ func (f *FileUpload) Open(cfg *UploadsConfig) error { f.Error = UploadErrorNoFile return err } - defer file.Close() + + defer func() { + // close the main file + err = file.Close() + }() tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload") if err != nil { @@ -120,7 +138,10 @@ func (f *FileUpload) Open(cfg *UploadsConfig) error { } f.TempFilename = tmp.Name() - defer tmp.Close() + defer func() { + // close the temp file + err = tmp.Close() + }() if f.Size, err = io.Copy(tmp, file); err != nil { f.Error = UploadErrorCantWrite @@ -131,10 +152,8 @@ func (f *FileUpload) Open(cfg *UploadsConfig) error { // exists if file exists. func exists(path string) bool { - _, err := os.Stat(path) - if err == nil { - return true + if _, err := os.Stat(path); os.IsNotExist(err) { + return false } - - return false + return true } diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go index 0fbf0e14..1890c02b 100644 --- a/service/http/uploads_test.go +++ b/service/http/uploads_test.go @@ -6,6 +6,7 @@ import ( "crypto/md5" "encoding/hex" "encoding/json" + "fmt" "github.com/spiral/roadrunner" "github.com/stretchr/testify/assert" "io" @@ -41,22 +42,43 @@ func TestHandler_Upload_File(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8021", Handler: h} - defer hs.Shutdown(context.Background()) - - go func() { hs.ListenAndServe() }() + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) var mb bytes.Buffer w := multipart.NewWriter(&mb) f := mustOpen("uploads_test.go") - defer f.Close() + defer func() { + err := f.Close() + if err != nil { + t.Errorf("failed to close a file: error %v", err) + } + }() fw, err := w.CreateFormFile("upload", f.Name()) assert.NotNil(t, fw) assert.NoError(t, err) - io.Copy(fw, f) + _, err = io.Copy(fw, f) + if err != nil { + t.Errorf("error copying the file: error %v", err) + } - w.Close() + err = w.Close() + if err != nil { + t.Errorf("error closing the file: error %v", err) + } req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) assert.NoError(t, err) @@ -65,7 +87,12 @@ func TestHandler_Upload_File(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -102,22 +129,43 @@ func TestHandler_Upload_NestedFile(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8021", Handler: h} - defer hs.Shutdown(context.Background()) - - go func() { hs.ListenAndServe() }() + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) var mb bytes.Buffer w := multipart.NewWriter(&mb) f := mustOpen("uploads_test.go") - defer f.Close() + defer func() { + err := f.Close() + if err != nil { + t.Errorf("failed to close a file: error %v", err) + } + }() fw, err := w.CreateFormFile("upload[x][y][z][]", f.Name()) assert.NotNil(t, fw) assert.NoError(t, err) - io.Copy(fw, f) + _, err = io.Copy(fw, f) + if err != nil { + t.Errorf("error copying the file: error %v", err) + } - w.Close() + err = w.Close() + if err != nil { + t.Errorf("error closing the file: error %v", err) + } req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) assert.NoError(t, err) @@ -126,7 +174,12 @@ func TestHandler_Upload_NestedFile(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -163,22 +216,43 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8021", Handler: h} - defer hs.Shutdown(context.Background()) - - go func() { hs.ListenAndServe() }() + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) var mb bytes.Buffer w := multipart.NewWriter(&mb) f := mustOpen("uploads_test.go") - defer f.Close() + defer func() { + err := f.Close() + if err != nil { + t.Errorf("failed to close a file: error %v", err) + } + }() fw, err := w.CreateFormFile("upload", f.Name()) assert.NotNil(t, fw) assert.NoError(t, err) - io.Copy(fw, f) + _, err = io.Copy(fw, f) + if err != nil { + t.Errorf("error copying the file: error %v", err) + } - w.Close() + err = w.Close() + if err != nil { + t.Errorf("error closing the file: error %v", err) + } req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) assert.NoError(t, err) @@ -187,7 +261,12 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -224,22 +303,43 @@ func TestHandler_Upload_File_Forbids(t *testing.T) { defer h.rr.Stop() hs := &http.Server{Addr: ":8021", Handler: h} - defer hs.Shutdown(context.Background()) - - go func() { hs.ListenAndServe() }() + defer func() { + err := hs.Shutdown(context.Background()) + if err != nil { + t.Errorf("error during the shutdown: error %v", err) + } + }() + + go func() { + err := hs.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + t.Errorf("error listening the interface: error %v", err) + } + }() time.Sleep(time.Millisecond * 10) var mb bytes.Buffer w := multipart.NewWriter(&mb) f := mustOpen("uploads_test.go") - defer f.Close() + defer func() { + err := f.Close() + if err != nil { + t.Errorf("failed to close a file: error %v", err) + } + }() fw, err := w.CreateFormFile("upload", f.Name()) assert.NotNil(t, fw) assert.NoError(t, err) - io.Copy(fw, f) + _, err = io.Copy(fw, f) + if err != nil { + t.Errorf("error copying the file: error %v", err) + } - w.Close() + err = w.Close() + if err != nil { + t.Errorf("error closing the file: error %v", err) + } req, err := http.NewRequest("POST", "http://localhost"+hs.Addr, &mb) assert.NoError(t, err) @@ -248,7 +348,12 @@ func TestHandler_Upload_File_Forbids(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error closing the Body: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -282,28 +387,47 @@ type fInfo struct { MD5 string `json:"md5,omitempty"` } -func fileString(f string, err int, mime string) string { - s, _ := os.Stat(f) +func fileString(f string, errNo int, mime string) string { + s, err := os.Stat(f) + if err != nil { + fmt.Println(fmt.Errorf("error stat the file, error: %v", err)) + } + + ff, err := os.Open(f) + if err != nil { + fmt.Println(fmt.Errorf("error opening the file, error: %v", err)) + } + + defer func() { + er := ff.Close() + if er != nil { + fmt.Println(fmt.Errorf("error closing the file, error: %v", er)) + } + }() - ff, _ := os.Open(f) - defer ff.Close() h := md5.New() - io.Copy(h, ff) + _, err = io.Copy(h, ff) + if err != nil { + fmt.Println(fmt.Errorf("error copying the file, error: %v", err)) + } v := &fInfo{ Name: s.Name(), Size: s.Size(), - Error: err, + Error: errNo, Mime: mime, MD5: hex.EncodeToString(h.Sum(nil)), } - if err != 0 { + if errNo != 0 { v.MD5 = "" v.Size = 0 } - r, _ := json.Marshal(v) + r, err := json.Marshal(v) + if err != nil { + fmt.Println(fmt.Errorf("error marshalling fInfo, error: %v", err)) + } return string(r) } diff --git a/service/limit/config_test.go b/service/limit/config_test.go index b8a6c0aa..b388791f 100644 --- a/service/limit/config_test.go +++ b/service/limit/config_test.go @@ -31,7 +31,10 @@ func Test_Controller_Default(t *testing.T) { } `} c := &Config{} - c.InitDefaults() + err := c.InitDefaults() + if err != nil { + t.Errorf("failed to InitDefaults: error %v", err) + } assert.NoError(t, c.Hydrate(cfg)) assert.Equal(t, time.Second, c.Interval) diff --git a/service/limit/controller.go b/service/limit/controller.go index c11f4b91..24a158f7 100644 --- a/service/limit/controller.go +++ b/service/limit/controller.go @@ -66,7 +66,12 @@ func (c *controller) control(p roadrunner.Pool) { // make sure worker still on initial request if p.Remove(w, err) && w.State().NumExecs() == eID { - go w.Kill() + go func() { + err := w.Kill() + if err != nil { + fmt.Printf("error killing worker with PID number: %d, created: %s", w.Pid, w.Created) + } + }() c.report(EventExecTTL, w, err) } } diff --git a/service/limit/service_test.go b/service/limit/service_test.go index ade4abcc..8cb3d7dc 100644 --- a/service/limit/service_test.go +++ b/service/limit/service_test.go @@ -75,7 +75,12 @@ func Test_Service_PidEcho(t *testing.T) { s, _ := c.Get(rrhttp.ID) assert.NotNil(t, s) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -84,7 +89,12 @@ func Test_Service_PidEcho(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the body closing: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -129,7 +139,12 @@ func Test_Service_ListenerPlusTTL(t *testing.T) { } }) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -140,7 +155,12 @@ func Test_Service_ListenerPlusTTL(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the body closing: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -194,7 +214,12 @@ func Test_Service_ListenerPlusIdleTTL(t *testing.T) { } }) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -205,7 +230,12 @@ func Test_Service_ListenerPlusIdleTTL(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) - defer r.Body.Close() + defer func() { + err := r.Body.Close() + if err != nil { + t.Errorf("error during the body closing: error %v", err) + } + }() b, err := ioutil.ReadAll(r.Body) assert.NoError(t, err) @@ -261,7 +291,12 @@ func Test_Service_Listener_MaxExecTTL(t *testing.T) { } }) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -313,7 +348,12 @@ func Test_Service_Listener_MaxMemoryUsage(t *testing.T) { } }) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -325,11 +365,17 @@ func Test_Service_Listener_MaxMemoryUsage(t *testing.T) { for { select { case <-captured: - http.DefaultClient.Do(req) + _, err := http.DefaultClient.Do(req) + if err != nil { + t.Errorf("error during sending the http request: error %v", err) + } assert.NotEqual(t, lastPID, getPID(s)) return default: - http.DefaultClient.Do(req) + _, err := http.DefaultClient.Do(req) + if err != nil { + t.Errorf("error during sending the http request: error %v", err) + } } } } diff --git a/service/metrics/rpc.go b/service/metrics/rpc.go index ee8ef984..9e7de640 100644 --- a/service/metrics/rpc.go +++ b/service/metrics/rpc.go @@ -32,26 +32,26 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) { return fmt.Errorf("undefined collector `%s`", m.Name) } - switch c.(type) { + switch c := c.(type) { case prometheus.Gauge: - c.(prometheus.Gauge).Add(m.Value) + c.Add(m.Value) case *prometheus.GaugeVec: if len(m.Labels) == 0 { return fmt.Errorf("required labels for collector `%s`", m.Name) } - c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Add(m.Value) + c.WithLabelValues(m.Labels...).Add(m.Value) case prometheus.Counter: - c.(prometheus.Counter).Add(m.Value) + c.Add(m.Value) case *prometheus.CounterVec: if len(m.Labels) == 0 { return fmt.Errorf("required labels for collector `%s`", m.Name) } - c.(*prometheus.CounterVec).WithLabelValues(m.Labels...).Add(m.Value) + c.WithLabelValues(m.Labels...).Add(m.Value) default: return fmt.Errorf("collector `%s` does not support method `Add`", m.Name) @@ -74,16 +74,16 @@ func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) { return fmt.Errorf("undefined collector `%s`", m.Name) } - switch c.(type) { + switch c := c.(type) { case prometheus.Gauge: - c.(prometheus.Gauge).Sub(m.Value) + c.Sub(m.Value) case *prometheus.GaugeVec: if len(m.Labels) == 0 { return fmt.Errorf("required labels for collector `%s`", m.Name) } - c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Sub(m.Value) + c.WithLabelValues(m.Labels...).Sub(m.Value) default: return fmt.Errorf("collector `%s` does not support method `Sub`", m.Name) } @@ -105,23 +105,23 @@ func (rpc *rpcServer) Observe(m *Metric, ok *bool) (err error) { return fmt.Errorf("undefined collector `%s`", m.Name) } - switch c.(type) { + switch c := c.(type) { case *prometheus.SummaryVec: if len(m.Labels) == 0 { return fmt.Errorf("required labels for collector `%s`", m.Name) } - c.(*prometheus.SummaryVec).WithLabelValues(m.Labels...).Observe(m.Value) + c.WithLabelValues(m.Labels...).Observe(m.Value) case prometheus.Histogram: - c.(prometheus.Histogram).Observe(m.Value) + c.Observe(m.Value) case *prometheus.HistogramVec: if len(m.Labels) == 0 { return fmt.Errorf("required labels for collector `%s`", m.Name) } - c.(*prometheus.HistogramVec).WithLabelValues(m.Labels...).Observe(m.Value) + c.WithLabelValues(m.Labels...).Observe(m.Value) default: return fmt.Errorf("collector `%s` does not support method `Observe`", m.Name) } @@ -143,16 +143,16 @@ func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) { return fmt.Errorf("undefined collector `%s`", m.Name) } - switch c.(type) { + switch c := c.(type) { case prometheus.Gauge: - c.(prometheus.Gauge).Set(m.Value) + c.Set(m.Value) case *prometheus.GaugeVec: if len(m.Labels) == 0 { return fmt.Errorf("required labels for collector `%s`", m.Name) } - c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Set(m.Value) + c.WithLabelValues(m.Labels...).Set(m.Value) default: return fmt.Errorf("collector `%s` does not support method `Set`", m.Name) diff --git a/service/metrics/rpc_test.go b/service/metrics/rpc_test.go index 6d061f1d..3fe48818 100644 --- a/service/metrics/rpc_test.go +++ b/service/metrics/rpc_test.go @@ -42,11 +42,19 @@ func setup(t *testing.T, metric string, portNum string) (*rpc2.Client, service.C assert.True(t, s.(*Service).Enabled()) - go func() { c.Serve() }() - time.Sleep(time.Millisecond * 100) + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + time.Sleep(time.Millisecond * 200) client, err := rs.Client() assert.NoError(t, err) + if err != nil { + panic(err) + } return client, c } @@ -80,7 +88,7 @@ func Test_Set_RPC_Vector(t *testing.T) { "type": "gauge", "labels": ["type", "section"] }`, - "2112", + "2113", ) defer c.Stop() @@ -92,7 +100,7 @@ func Test_Set_RPC_Vector(t *testing.T) { }, &ok)) assert.True(t, ok) - out, _, err := get("http://localhost:2112/metrics") + out, _, err := get("http://localhost:2113/metrics") assert.NoError(t, err) assert.Contains(t, out, `user_gauge{section="first",type="core"} 100`) } @@ -104,7 +112,7 @@ func Test_Set_RPC_CollectorError(t *testing.T) { "type": "gauge", "labels": ["type", "section"] }`, - "2112", + "2114", ) defer c.Stop() @@ -123,7 +131,7 @@ func Test_Set_RPC_MetricError(t *testing.T) { "type": "gauge", "labels": ["type", "section"] }`, - "2112", + "2115", ) defer c.Stop() @@ -142,7 +150,7 @@ func Test_Set_RPC_MetricError_2(t *testing.T) { "type": "gauge", "labels": ["type", "section"] }`, - "2112", + "2116", ) defer c.Stop() @@ -160,7 +168,7 @@ func Test_Set_RPC_MetricError_3(t *testing.T) { "type": "histogram", "labels": ["type", "section"] }`, - "2112", + "2117", ) defer c.Stop() @@ -179,7 +187,7 @@ func Test_Sub_RPC(t *testing.T) { `"user_gauge":{ "type": "gauge" }`, - "2113", + "2118", ) defer c.Stop() @@ -196,7 +204,7 @@ func Test_Sub_RPC(t *testing.T) { }, &ok)) assert.True(t, ok) - out, _, err := get("http://localhost:2113/metrics") + out, _, err := get("http://localhost:2118/metrics") assert.NoError(t, err) assert.Contains(t, out, `user_gauge 90`) } @@ -208,7 +216,7 @@ func Test_Sub_RPC_Vector(t *testing.T) { "type": "gauge", "labels": ["type", "section"] }`, - "2114", + "2119", ) defer c.Stop() @@ -227,7 +235,7 @@ func Test_Sub_RPC_Vector(t *testing.T) { }, &ok)) assert.True(t, ok) - out, _, err := get("http://localhost:2114/metrics") + out, _, err := get("http://localhost:2119/metrics") assert.NoError(t, err) assert.Contains(t, out, `user_gauge{section="first",type="core"} 90`) } @@ -239,7 +247,7 @@ func Test_Sub_RPC_CollectorError(t *testing.T) { "type": "gauge", "labels": ["type", "section"] }`, - "2112", + "2120", ) defer c.Stop() @@ -258,7 +266,7 @@ func Test_Sub_RPC_MetricError(t *testing.T) { "type": "gauge", "labels": ["type", "section"] }`, - "2112", + "2121", ) defer c.Stop() @@ -277,7 +285,7 @@ func Test_Sub_RPC_MetricError_2(t *testing.T) { "type": "gauge", "labels": ["type", "section"] }`, - "2112", + "2122", ) defer c.Stop() @@ -295,7 +303,7 @@ func Test_Sub_RPC_MetricError_3(t *testing.T) { "type": "histogram", "labels": ["type", "section"] }`, - "2112", + "2123", ) defer c.Stop() @@ -314,7 +322,7 @@ func Test_Observe_RPC(t *testing.T) { `"user_histogram":{ "type": "histogram" }`, - "2116", + "2124", ) defer c.Stop() @@ -325,7 +333,7 @@ func Test_Observe_RPC(t *testing.T) { }, &ok)) assert.True(t, ok) - out, _, err := get("http://localhost:2116/metrics") + out, _, err := get("http://localhost:2124/metrics") assert.NoError(t, err) assert.Contains(t, out, `user_histogram`) } @@ -337,7 +345,7 @@ func Test_Observe_RPC_Vector(t *testing.T) { "type": "histogram", "labels": ["type", "section"] }`, - "2117", + "2125", ) defer c.Stop() @@ -349,7 +357,7 @@ func Test_Observe_RPC_Vector(t *testing.T) { }, &ok)) assert.True(t, ok) - out, _, err := get("http://localhost:2117/metrics") + out, _, err := get("http://localhost:2125/metrics") assert.NoError(t, err) assert.Contains(t, out, `user_histogram`) } @@ -361,7 +369,7 @@ func Test_Observe_RPC_CollectorError(t *testing.T) { "type": "histogram", "labels": ["type", "section"] }`, - "2112", + "2126", ) defer c.Stop() @@ -380,7 +388,7 @@ func Test_Observe_RPC_MetricError(t *testing.T) { "type": "histogram", "labels": ["type", "section"] }`, - "2112", + "2127", ) defer c.Stop() @@ -399,7 +407,7 @@ func Test_Observe_RPC_MetricError_2(t *testing.T) { "type": "histogram", "labels": ["type", "section"] }`, - "2112", + "2128", ) defer c.Stop() @@ -418,7 +426,7 @@ func Test_Observe2_RPC(t *testing.T) { `"user_histogram":{ "type": "summary" }`, - "2118", + "2129", ) defer c.Stop() @@ -429,7 +437,7 @@ func Test_Observe2_RPC(t *testing.T) { }, &ok)) assert.True(t, ok) - out, _, err := get("http://localhost:2118/metrics") + out, _, err := get("http://localhost:2129/metrics") assert.NoError(t, err) assert.Contains(t, out, `user_histogram`) } @@ -440,7 +448,7 @@ func Test_Observe2_RPC_Invalid(t *testing.T) { `"user_histogram":{ "type": "summary" }`, - "2112", + "2130", ) defer c.Stop() @@ -458,7 +466,7 @@ func Test_Observe2_RPC_Invalid_2(t *testing.T) { `"user_histogram":{ "type": "gauge" }`, - "2112", + "2131", ) defer c.Stop() @@ -476,7 +484,7 @@ func Test_Observe2_RPC_Vector(t *testing.T) { "type": "summary", "labels": ["type", "section"] }`, - "2119", + "2132", ) defer c.Stop() @@ -488,7 +496,7 @@ func Test_Observe2_RPC_Vector(t *testing.T) { }, &ok)) assert.True(t, ok) - out, _, err := get("http://localhost:2119/metrics") + out, _, err := get("http://localhost:2132/metrics") assert.NoError(t, err) assert.Contains(t, out, `user_histogram`) } @@ -500,7 +508,7 @@ func Test_Observe2_RPC_CollectorError(t *testing.T) { "type": "summary", "labels": ["type", "section"] }`, - "2112", + "2133", ) defer c.Stop() @@ -519,7 +527,7 @@ func Test_Observe2_RPC_MetricError(t *testing.T) { "type": "summary", "labels": ["type", "section"] }`, - "2112", + "2134", ) defer c.Stop() @@ -538,7 +546,7 @@ func Test_Observe2_RPC_MetricError_2(t *testing.T) { "type": "summary", "labels": ["type", "section"] }`, - "2112", + "2135", ) defer c.Stop() @@ -556,7 +564,7 @@ func Test_Add_RPC(t *testing.T) { `"user_gauge":{ "type": "counter" }`, - "2120", + "2136", ) defer c.Stop() @@ -567,7 +575,7 @@ func Test_Add_RPC(t *testing.T) { }, &ok)) assert.True(t, ok) - out, _, err := get("http://localhost:2120/metrics") + out, _, err := get("http://localhost:2136/metrics") assert.NoError(t, err) assert.Contains(t, out, `user_gauge 100`) } @@ -579,7 +587,7 @@ func Test_Add_RPC_Vector(t *testing.T) { "type": "counter", "labels": ["type", "section"] }`, - "2121", + "2137", ) defer c.Stop() @@ -591,7 +599,7 @@ func Test_Add_RPC_Vector(t *testing.T) { }, &ok)) assert.True(t, ok) - out, _, err := get("http://localhost:2121/metrics") + out, _, err := get("http://localhost:2137/metrics") assert.NoError(t, err) assert.Contains(t, out, `user_gauge{section="first",type="core"} 100`) } @@ -603,7 +611,7 @@ func Test_Add_RPC_CollectorError(t *testing.T) { "type": "counter", "labels": ["type", "section"] }`, - "2112", + "2138", ) defer c.Stop() @@ -622,7 +630,7 @@ func Test_Add_RPC_MetricError(t *testing.T) { "type": "counter", "labels": ["type", "section"] }`, - "2112", + "2139", ) defer c.Stop() @@ -641,7 +649,7 @@ func Test_Add_RPC_MetricError_2(t *testing.T) { "type": "counter", "labels": ["type", "section"] }`, - "2112", + "2140", ) defer c.Stop() @@ -659,7 +667,7 @@ func Test_Add_RPC_MetricError_3(t *testing.T) { "type": "histogram", "labels": ["type", "section"] }`, - "2112", + "2141", ) defer c.Stop() diff --git a/service/metrics/service.go b/service/metrics/service.go index 4916b3e0..6fa4da50 100644 --- a/service/metrics/service.go +++ b/service/metrics/service.go @@ -1,9 +1,13 @@ package metrics +// todo: declare metric at runtime + import ( "context" + "fmt" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" "github.com/spiral/roadrunner/service/rpc" "net/http" "sync" @@ -15,6 +19,7 @@ const ID = "metrics" // Service to manage application metrics using Prometheus. type Service struct { cfg *Config + log *logrus.Logger mu sync.Mutex http *http.Server collectors sync.Map @@ -22,8 +27,9 @@ type Service struct { } // Init service. -func (s *Service) Init(cfg *Config, r *rpc.Service) (bool, error) { +func (s *Service) Init(cfg *Config, r *rpc.Service, log *logrus.Logger) (bool, error) { s.cfg = cfg + s.log = log s.registry = prometheus.NewRegistry() s.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) @@ -76,7 +82,12 @@ func (s *Service) Serve() error { )} s.mu.Unlock() - return s.http.ListenAndServe() + err = s.http.ListenAndServe() + if err == nil || err == http.ErrServerClosed { + return nil + } + + return err } // Stop prometheus metrics service. @@ -86,7 +97,13 @@ func (s *Service) Stop() { if s.http != nil { // gracefully stop server - go s.http.Shutdown(context.Background()) + go func() { + err := s.http.Shutdown(context.Background()) + if err != nil { + // Function should be Stop() error + s.log.Error(fmt.Errorf("error shutting down the metrics server: error %v", err)) + } + }() } } diff --git a/service/metrics/service_test.go b/service/metrics/service_test.go index 0cf6fd95..62e6f6d7 100644 --- a/service/metrics/service_test.go +++ b/service/metrics/service_test.go @@ -43,9 +43,16 @@ func get(url string) (string, *http.Response, error) { if err != nil { return "", nil, err } - defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", nil, err + } + + err = r.Body.Close() + if err != nil { + return "", nil, err + } return string(b), r, err } @@ -63,7 +70,12 @@ func TestService_Serve(t *testing.T) { s, _ := c.Get(ID) assert.NotNil(t, s) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -94,7 +106,12 @@ func Test_ServiceCustomMetric(t *testing.T) { assert.NoError(t, s.(*Service).Register(collector)) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -127,7 +144,12 @@ func Test_ServiceCustomMetricMust(t *testing.T) { s.(*Service).MustRegister(collector) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -160,7 +182,12 @@ func Test_ConfiguredMetric(t *testing.T) { assert.True(t, s.(*Service).Enabled()) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go index af261698..c65e7415 100644 --- a/service/rpc/config_test.go +++ b/service/rpc/config_test.go @@ -40,7 +40,12 @@ func TestConfig_Listener(t *testing.T) { ln, err := cfg.Listener() assert.NoError(t, err) assert.NotNil(t, ln) - defer ln.Close() + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() assert.Equal(t, "tcp", ln.Addr().Network()) assert.Equal(t, "[::]:18001", ln.Addr().String()) @@ -56,7 +61,12 @@ func TestConfig_ListenerUnix(t *testing.T) { ln, err := cfg.Listener() assert.NoError(t, err) assert.NotNil(t, ln) - defer ln.Close() + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() assert.Equal(t, "unix", ln.Addr().Network()) assert.Equal(t, "file.sock", ln.Addr().String()) @@ -71,7 +81,7 @@ func Test_Config_Error(t *testing.T) { ln, err := cfg.Listener() assert.Nil(t, ln) assert.Error(t, err) - assert.Equal(t, "Invalid DSN (tcp://:6001, unix://file.sock)", err.Error()) + assert.Equal(t, "invalid DSN (tcp://:6001, unix://file.sock)", err.Error()) } func Test_Config_ErrorMethod(t *testing.T) { @@ -86,12 +96,22 @@ func TestConfig_Dialer(t *testing.T) { cfg := &Config{Listen: "tcp://:18001"} ln, _ := cfg.Listener() - defer ln.Close() + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() conn, err := cfg.Dialer() assert.NoError(t, err) assert.NotNil(t, conn) - defer conn.Close() + defer func() { + err := conn.Close() + if err != nil { + t.Errorf("error closing the connection: error %v", err) + } + }() assert.Equal(t, "tcp", conn.RemoteAddr().Network()) assert.Equal(t, "127.0.0.1:18001", conn.RemoteAddr().String()) @@ -105,12 +125,22 @@ func TestConfig_DialerUnix(t *testing.T) { cfg := &Config{Listen: "unix://file.sock"} ln, _ := cfg.Listener() - defer ln.Close() + defer func() { + err := ln.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() conn, err := cfg.Dialer() assert.NoError(t, err) assert.NotNil(t, conn) - defer conn.Close() + defer func() { + err := conn.Close() + if err != nil { + t.Errorf("error closing the connection: error %v", err) + } + }() assert.Equal(t, "unix", conn.RemoteAddr().Network()) assert.Equal(t, "file.sock", conn.RemoteAddr().String()) @@ -138,7 +168,10 @@ func Test_Config_DialerErrorMethod(t *testing.T) { func Test_Config_Defaults(t *testing.T) { c := &Config{} - c.InitDefaults() + err := c.InitDefaults() + if err != nil { + t.Errorf("error during the InitDefaults: error %v", err) + } assert.Equal(t, true, c.Enable) assert.Equal(t, "tcp://127.0.0.1:6001", c.Listen) } diff --git a/service/static/service_test.go b/service/static/service_test.go index d345b138..309804cc 100644 --- a/service/static/service_test.go +++ b/service/static/service_test.go @@ -41,9 +41,17 @@ func get(url string) (string, *http.Response, error) { if err != nil { return "", nil, err } - defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", nil, err + } + + err = r.Body.Close() + if err != nil { + return "", nil, err + } + return string(b), r, err } @@ -76,7 +84,12 @@ func Test_Files(t *testing.T) { } }`})) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -129,7 +142,12 @@ func Test_Files_Disable(t *testing.T) { } }`})) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -226,7 +244,12 @@ func Test_Files_Forbid(t *testing.T) { } }`})) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -263,7 +286,12 @@ func Test_Files_Always(t *testing.T) { } }`})) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -300,7 +328,12 @@ func Test_Files_NotFound(t *testing.T) { } }`})) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -337,7 +370,12 @@ func Test_Files_Dir(t *testing.T) { } }`})) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -374,7 +412,12 @@ func Test_Files_NotForbid(t *testing.T) { } }`})) - go func() { c.Serve() }() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() time.Sleep(time.Millisecond * 100) defer c.Stop() @@ -391,10 +434,17 @@ func tmpDir() string { func all(fn string) string { f, _ := os.Open(fn) - defer f.Close() b := &bytes.Buffer{} - io.Copy(b, f) + _, err := io.Copy(b, f) + if err != nil { + return "" + } + + err = f.Close() + if err != nil { + return "" + } return b.String() } diff --git a/socket_factory.go b/socket_factory.go index 43059e8a..84515f64 100644 --- a/socket_factory.go +++ b/socket_factory.go @@ -51,7 +51,12 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { rl, err := f.findRelay(w, f.tout) if err != nil { - go func(w *Worker) { w.Kill() }(w) + go func(w *Worker) { + err := w.Kill() + if err != nil { + fmt.Println(fmt.Errorf("error killing the worker %v", err)) + } + }(w) if wErr := w.Wait(); wErr != nil { if _, ok := wErr.(*exec.ExitError); ok { diff --git a/socket_factory_test.go b/socket_factory_test.go index 56d9313e..8beb3fc6 100644 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -14,7 +14,12 @@ func Test_Tcp_Start(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -29,7 +34,10 @@ func Test_Tcp_Start(t *testing.T) { assert.NoError(t, w.Wait()) }() - w.Stop() + err = w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } } func Test_Tcp_StartCloseFactory(t *testing.T) { @@ -44,7 +52,12 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "echo", "tcp") f := NewSocketFactory(ls, time.Minute) - defer f.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() w, err := f.SpawnWorker(cmd) assert.NoError(t, err) @@ -54,7 +67,10 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { assert.NoError(t, w.Wait()) }() - w.Stop() + err = w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } } func Test_Tcp_StartError(t *testing.T) { @@ -62,13 +78,21 @@ func Test_Tcp_StartError(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - cmd.Start() + err = cmd.Start() + if err != nil { + t.Errorf("error executing the command: error %v", err) + } w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) assert.Error(t, err) @@ -80,7 +104,12 @@ func Test_Tcp_Failboot(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -98,7 +127,12 @@ func Test_Tcp_Timeout(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -116,7 +150,12 @@ func Test_Tcp_Invalid(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -133,7 +172,12 @@ func Test_Tcp_Broken(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -147,7 +191,11 @@ func Test_Tcp_Broken(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "undefined_function()") }() - defer w.Stop() + + defer func() { + err = w.Stop() + assert.Error(t, err) + }() res, err := w.Exec(&Payload{Body: []byte("hello")}) @@ -160,7 +208,12 @@ func Test_Tcp_Echo(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -171,7 +224,12 @@ func Test_Tcp_Echo(t *testing.T) { go func() { assert.NoError(t, w.Wait()) }() - defer w.Stop() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } + }() res, err := w.Exec(&Payload{Body: []byte("hello")}) @@ -190,7 +248,12 @@ func Test_Unix_Start(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -205,7 +268,10 @@ func Test_Unix_Start(t *testing.T) { assert.NoError(t, w.Wait()) }() - w.Stop() + err = w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } } func Test_Unix_Failboot(t *testing.T) { @@ -215,7 +281,12 @@ func Test_Unix_Failboot(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -235,7 +306,12 @@ func Test_Unix_Timeout(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -255,7 +331,12 @@ func Test_Unix_Invalid(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -274,7 +355,12 @@ func Test_Unix_Broken(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -284,11 +370,14 @@ func Test_Unix_Broken(t *testing.T) { w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) go func() { err := w.Wait() - assert.Error(t, err) assert.Contains(t, err.Error(), "undefined_function()") }() - defer w.Stop() + + defer func() { + err = w.Stop() + assert.Error(t, err) + }() res, err := w.Exec(&Payload{Body: []byte("hello")}) @@ -303,7 +392,12 @@ func Test_Unix_Echo(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() } else { t.Skip("socket is busy") } @@ -314,7 +408,12 @@ func Test_Unix_Echo(t *testing.T) { go func() { assert.NoError(t, w.Wait()) }() - defer w.Stop() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } + }() res, err := w.Exec(&Payload{Body: []byte("hello")}) @@ -329,7 +428,12 @@ func Test_Unix_Echo(t *testing.T) { func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { ls, err := net.Listen("tcp", "localhost:9007") if err == nil { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() } else { b.Skip("socket is busy") } @@ -345,14 +449,22 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { } }() - w.Stop() + err = w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } } } func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { ls, err := net.Listen("tcp", "localhost:9007") if err == nil { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() } else { b.Skip("socket is busy") } @@ -361,9 +473,17 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) go func() { - w.Wait() + err := w.Wait() + if err != nil { + b.Errorf("error waiting: %v", err) + } + }() + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } }() - defer w.Stop() for n := 0; n < b.N; n++ { if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil { @@ -379,7 +499,12 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") if err == nil { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() } else { b.Skip("socket is busy") } @@ -395,7 +520,10 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { } }() - w.Stop() + err = w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } } } @@ -406,7 +534,12 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") if err == nil { - defer ls.Close() + defer func() { + err := ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() } else { b.Skip("socket is busy") } @@ -415,9 +548,17 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) go func() { - w.Wait() + err := w.Wait() + if err != nil { + b.Errorf("error waiting: %v", err) + } + }() + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } }() - defer w.Stop() for n := 0; n < b.N; n++ { if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil { diff --git a/static_pool.go b/static_pool.go index 66b1366e..2186227b 100644 --- a/static_pool.go +++ b/static_pool.go @@ -42,7 +42,6 @@ type StaticPool struct { workers []*Worker // invalid declares set of workers to be removed from the pool. - mur sync.Mutex remove sync.Map // pool is being destroyed @@ -108,9 +107,7 @@ func (p *StaticPool) Workers() (workers []*Worker) { p.muw.RLock() defer p.muw.RUnlock() - for _, w := range p.workers { - workers = append(workers, w) - } + workers = append(workers, p.workers...) return workers } @@ -294,7 +291,12 @@ func (p *StaticPool) discardWorker(w *Worker, caused interface{}) { // destroyWorker destroys workers and removes it from the pool. func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) { - go w.Stop() + go func() { + err := w.Stop() + if err != nil { + p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) + } + }() select { case <-w.waitDone: diff --git a/static_pool_test.go b/static_pool_test.go index a7e71fdb..1f185f58 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -6,6 +6,7 @@ import ( "os/exec" "runtime" "strconv" + "strings" "sync" "testing" "time" @@ -23,12 +24,13 @@ func Test_NewPool(t *testing.T) { NewPipeFactory(), cfg, ) + assert.NoError(t, err) + assert.Equal(t, cfg, p.Config()) defer p.Destroy() assert.NotNil(t, p) - assert.NoError(t, err) } func Test_StaticPool_Invalid(t *testing.T) { @@ -62,10 +64,11 @@ func Test_StaticPool_Echo(t *testing.T) { NewPipeFactory(), cfg, ) + assert.NoError(t, err) + defer p.Destroy() assert.NotNil(t, p) - assert.NoError(t, err) res, err := p.Exec(&Payload{Body: []byte("hello")}) @@ -83,10 +86,11 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { NewPipeFactory(), cfg, ) + assert.NoError(t, err) + defer p.Destroy() assert.NotNil(t, p) - assert.NoError(t, err) res, err := p.Exec(&Payload{Body: []byte("hello"), Context: nil}) @@ -104,10 +108,11 @@ func Test_StaticPool_Echo_Context(t *testing.T) { NewPipeFactory(), cfg, ) + assert.NoError(t, err) + defer p.Destroy() assert.NotNil(t, p) - assert.NoError(t, err) res, err := p.Exec(&Payload{Body: []byte("hello"), Context: []byte("world")}) @@ -125,10 +130,10 @@ func Test_StaticPool_JobError(t *testing.T) { NewPipeFactory(), cfg, ) + assert.NoError(t, err) defer p.Destroy() assert.NotNil(t, p) - assert.NoError(t, err) res, err := p.Exec(&Payload{Body: []byte("hello")}) @@ -145,14 +150,17 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { NewPipeFactory(), cfg, ) + assert.NoError(t, err) defer p.Destroy() assert.NotNil(t, p) - assert.NoError(t, err) + done := make(chan interface{}) p.Listen(func(e int, ctx interface{}) { if err, ok := ctx.(error); ok { - assert.Contains(t, err.Error(), "undefined_function()") + if strings.Contains(err.Error(), "undefined_function()") { + close(done) + } } }) @@ -160,6 +168,8 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) + + <-done } func Test_StaticPool_Broken_FromOutside(t *testing.T) { @@ -168,10 +178,10 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { NewPipeFactory(), cfg, ) + assert.NoError(t, err) defer p.Destroy() assert.NotNil(t, p) - assert.NoError(t, err) res, err := p.Exec(&Payload{Body: []byte("hello")}) @@ -191,9 +201,10 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { }) // killing random worker and expecting pool to replace it - p.muw.Lock() - p.workers[0].cmd.Process.Kill() - p.muw.Unlock() + err = p.Workers()[0].cmd.Process.Kill() + if err != nil { + t.Errorf("error killing the process: error %v", err) + } <-destructed for _, w := range p.Workers() { @@ -244,10 +255,10 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { DestroyTimeout: time.Second, }, ) + assert.NoError(t, err) defer p.Destroy() assert.NotNil(t, p) - assert.NoError(t, err) var lastPID string lastPID = strconv.Itoa(*p.Workers()[0].Pid) @@ -279,10 +290,10 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { DestroyTimeout: time.Second, }, ) + assert.NoError(t, err) defer p.Destroy() assert.NotNil(t, p) - assert.NoError(t, err) var lastPID string lastPID = strconv.Itoa(*p.Workers()[0].Pid) @@ -338,7 +349,13 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NotNil(t, p) assert.NoError(t, err) - go p.Exec(&Payload{Body: []byte("100")}) + go func() { + _, err := p.Exec(&Payload{Body: []byte("100")}) + if err != nil { + t.Errorf("error executing payload: error %v", err) + } + + }() time.Sleep(time.Millisecond * 10) p.Destroy() @@ -357,10 +374,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { DestroyTimeout: time.Second, }, ) + assert.NoError(t, err) defer p.Destroy() assert.NotNil(t, p) - assert.NoError(t, err) for _, w := range p.workers { w.state.value = StateErrored diff --git a/util/network.go b/util/network.go index 70e38fdb..b9066de7 100644 --- a/util/network.go +++ b/util/network.go @@ -2,7 +2,9 @@ package util import ( "errors" + "fmt" "net" + "os" "strings" "syscall" ) @@ -11,16 +13,29 @@ import ( func CreateListener(address string) (net.Listener, error) { dsn := strings.Split(address, "://") if len(dsn) != 2 { - return nil, errors.New("Invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") } if dsn[0] != "unix" && dsn[0] != "tcp" { - return nil, errors.New("Invalid Protocol (tcp://:6001, unix://file.sock)") + return nil, errors.New("invalid Protocol (tcp://:6001, unix://file.sock)") } - if dsn[0] == "unix" { - syscall.Unlink(dsn[1]) + if dsn[0] == "unix" && fileExists(dsn[1]) { + err := syscall.Unlink(dsn[1]) + if err != nil { + return nil, fmt.Errorf("error during the unlink syscall: error %v", err) + } } return net.Listen(dsn[0], dsn[1]) } + +// fileExists checks if a file exists and is not a directory before we +// try using it to prevent further errors. +func fileExists(filename string) bool { + info, err := os.Stat(filename) + if os.IsNotExist(err) { + return false + } + return !info.IsDir() +} @@ -6,7 +6,6 @@ import ( "github.com/spiral/goridge" "os" "os/exec" - "runtime" "strconv" "strings" "sync" @@ -102,12 +101,6 @@ func (w *Worker) Wait() error { w.mu.Lock() defer w.mu.Unlock() - if runtime.GOOS != "windows" { - // windows handles processes and close pipes differently, - // we can ignore wait here as process.Wait() already being handled above - w.cmd.Wait() - } - if w.endState.Success() { w.state.set(StateStopped) return nil @@ -214,10 +207,16 @@ func (w *Worker) start() error { defer w.mu.Unlock() if w.rl != nil { - w.rl.Close() + err := w.rl.Close() + if err != nil { + w.err.lsn(EventWorkerError, WorkerError{Worker: w, Caused: err}) + } } - w.err.Close() + err := w.err.Close() + if err != nil { + w.err.lsn(EventWorkerError, WorkerError{Worker: w, Caused: err}) + } } }() diff --git a/worker_test.go b/worker_test.go index c357b6e0..e8cbef90 100644 --- a/worker_test.go +++ b/worker_test.go @@ -19,7 +19,10 @@ func Test_GetState(t *testing.T) { assert.NotNil(t, w) assert.Equal(t, StateReady, w.State().Value()) - w.Stop() + err = w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } } func Test_Kill(t *testing.T) { @@ -35,7 +38,12 @@ func Test_Kill(t *testing.T) { assert.NotNil(t, w) assert.Equal(t, StateReady, w.State().Value()) - w.Kill() + defer func() { + err := w.Kill() + if err != nil { + t.Errorf("error killing the worker: error %v", err) + } + }() } func Test_Echo(t *testing.T) { @@ -45,7 +53,12 @@ func Test_Echo(t *testing.T) { go func() { assert.NoError(t, w.Wait()) }() - defer w.Stop() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } + }() res, err := w.Exec(&Payload{Body: []byte("hello")}) @@ -64,7 +77,12 @@ func Test_BadPayload(t *testing.T) { go func() { assert.NoError(t, w.Wait()) }() - defer w.Stop() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } + }() res, err := w.Exec(nil) @@ -103,7 +121,12 @@ func Test_String(t *testing.T) { go func() { assert.NoError(t, w.Wait()) }() - defer w.Stop() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } + }() assert.Contains(t, w.String(), "php tests/client.php echo pipes") assert.Contains(t, w.String(), "ready") @@ -117,7 +140,12 @@ func Test_Echo_Slow(t *testing.T) { go func() { assert.NoError(t, w.Wait()) }() - defer w.Stop() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } + }() res, err := w.Exec(&Payload{Body: []byte("hello")}) @@ -138,7 +166,11 @@ func Test_Broken(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "undefined_function()") }() - defer w.Stop() + + defer func() { + err := w.Stop() + assert.Error(t, err) + }() res, err := w.Exec(&Payload{Body: []byte("hello")}) assert.Nil(t, res) @@ -163,7 +195,13 @@ func Test_Error(t *testing.T) { go func() { assert.NoError(t, w.Wait()) }() - defer w.Stop() + + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } + }() res, err := w.Exec(&Payload{Body: []byte("hello")}) assert.Nil(t, res) @@ -180,14 +218,28 @@ func Test_NumExecs(t *testing.T) { go func() { assert.NoError(t, w.Wait()) }() - defer w.Stop() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the worker: error %v", err) + } + }() - w.Exec(&Payload{Body: []byte("hello")}) + _, err := w.Exec(&Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } assert.Equal(t, int64(1), w.State().NumExecs()) - w.Exec(&Payload{Body: []byte("hello")}) + _, err = w.Exec(&Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } assert.Equal(t, int64(2), w.State().NumExecs()) - w.Exec(&Payload{Body: []byte("hello")}) + _, err = w.Exec(&Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } assert.Equal(t, int64(3), w.State().NumExecs()) } |