diff options
author | Anton Titov <[email protected]> | 2019-12-23 14:52:06 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2019-12-23 14:52:06 +0300 |
commit | 921e1f55e23ab75b8250045916c8d1ffad1b8bde (patch) | |
tree | 00b16331b9ff3b3b846ba22989dddde721cc959d | |
parent | 921354df1aa4687837e3ba6ac0eb04d39321c149 (diff) | |
parent | 2093cb9058f94668fff0a97beb76b0cab66c7b63 (diff) |
Merge branch 'master' into Fix_warning_and_issues
-rw-r--r-- | CHANGELOG.md | 5 | ||||
-rwxr-xr-x | build.sh | 2 | ||||
-rw-r--r-- | pipe_factory_test.go | 4 | ||||
-rw-r--r-- | server_config.go | 40 | ||||
-rw-r--r-- | server_config_test.go | 15 | ||||
-rw-r--r-- | service/container.go | 13 | ||||
-rw-r--r-- | service/container_test.go | 19 | ||||
-rw-r--r-- | service/health/service.go | 18 | ||||
-rw-r--r-- | service/http/handler.go | 6 | ||||
-rw-r--r-- | service/http/handler_test.go | 62 | ||||
-rw-r--r-- | service/http/request.go | 9 | ||||
-rw-r--r-- | service/http/service.go | 50 | ||||
-rw-r--r-- | service/http/service_test.go | 4 | ||||
-rw-r--r-- | service/http/uploads.go | 16 | ||||
-rw-r--r-- | service/http/uploads_test.go | 8 | ||||
-rw-r--r-- | service/metrics/rpc_test.go | 5 | ||||
-rw-r--r-- | service/metrics/service.go | 17 | ||||
-rw-r--r-- | socket_factory_test.go | 5 | ||||
-rw-r--r-- | static_pool_test.go | 10 | ||||
-rw-r--r-- | worker.go | 10 | ||||
-rw-r--r-- | worker_test.go | 6 |
21 files changed, 217 insertions, 107 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index a40a7096..12924618 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +v1.5.2 (05.12.2019) +------------------- +- added support for symfony/console 5.0 by @coxa +- added support for HTTP2 trailers by @filakhtov + v1.5.1 (22.10.2019) ------------------- - bugfix: do not halt stop sequence in case of service error @@ -3,7 +3,7 @@ cd $(dirname "${BASH_SOURCE[0]}") OD="$(pwd)" # Pushes application version into the build information. -RR_VERSION=1.5.1 +RR_VERSION=1.5.2 # Hardcode some values to the core package LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}" diff --git a/pipe_factory_test.go b/pipe_factory_test.go index 63cee6d4..27d1f74d 100644 --- a/pipe_factory_test.go +++ b/pipe_factory_test.go @@ -109,9 +109,7 @@ func Test_Pipe_Broken(t *testing.T) { }() defer func() { err := w.Stop() - if err != nil { - t.Errorf("error stopping the worker: error %v", err) - } + assert.Error(t, err) }() res, err := w.Exec(&Payload{Body: []byte("hello")}) diff --git a/server_config.go b/server_config.go index 8c96aaa8..641c1866 100644 --- a/server_config.go +++ b/server_config.go @@ -8,15 +8,22 @@ import ( "os" "os/exec" "strings" + "sync" "syscall" "time" ) +// CommandProducer can produce commands. +type CommandProducer func(cfg *ServerConfig) func() *exec.Cmd + // ServerConfig config combines factory, pool and cmd configurations. type ServerConfig struct { // Command includes command strings with all the parameters, example: "php worker.php pipes". Command string + // CommandProducer overwrites + CommandProducer CommandProducer + // Relay defines connection method and factory to be used to connect to workers: // "pipes", "tcp://:6001", "unix://rr.sock" // This config section must not change on re-configuration. @@ -31,7 +38,8 @@ type ServerConfig struct { Pool *Config // values defines set of values to be passed to the command context. - env []string + mu sync.Mutex + env map[string]string } // InitDefaults sets missing values to their default values. @@ -68,18 +76,42 @@ func (cfg *ServerConfig) Differs(new *ServerConfig) bool { // SetEnv sets new environment variable. Value is automatically uppercase-d. func (cfg *ServerConfig) SetEnv(k, v string) { - cfg.env = append(cfg.env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + cfg.mu.Lock() + defer cfg.mu.Unlock() + + if cfg.env == nil { + cfg.env = make(map[string]string) + } + + cfg.env[k] = v +} + +// GetEnv must return list of env variables. +func (cfg *ServerConfig) GetEnv() (env []string) { + env = append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", cfg.Relay)) + for k, v := range cfg.env { + env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + } + + return } // makeCommands returns new command provider based on configured options. func (cfg *ServerConfig) makeCommand() func() *exec.Cmd { + cfg.mu.Lock() + defer cfg.mu.Unlock() + + if cfg.CommandProducer != nil { + return cfg.CommandProducer(cfg) + } + var cmd = strings.Split(cfg.Command, " ") return func() *exec.Cmd { cmd := exec.Command(cmd[0], cmd[1:]...) osutil.IsolateProcess(cmd) - cmd.Env = append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", cfg.Relay)) - cmd.Env = append(cmd.Env, cfg.env...) + cmd.Env = cfg.GetEnv() + return cmd } } diff --git a/server_config_test.go b/server_config_test.go index e408bde1..4f26d6ab 100644 --- a/server_config_test.go +++ b/server_config_test.go @@ -31,23 +31,24 @@ func Test_ServerConfig_PipeFactory(t *testing.T) { func Test_ServerConfig_SocketFactory(t *testing.T) { cfg := &ServerConfig{Relay: "tcp://:9111"} - f, err := cfg.makeFactory() + f1, err := cfg.makeFactory() assert.NoError(t, err) - assert.NotNil(t, f) + assert.NotNil(t, f1) defer func() { - err := f.Close() + err := f1.Close() + if err != nil { t.Errorf("error closing factory or underlying connections: error %v", err) } }() assert.NoError(t, err) - assert.IsType(t, &SocketFactory{}, f) - assert.Equal(t, "tcp", f.(*SocketFactory).ls.Addr().Network()) - assert.Equal(t, "[::]:9111", f.(*SocketFactory).ls.Addr().String()) + assert.IsType(t, &SocketFactory{}, f1) + assert.Equal(t, "tcp", f1.(*SocketFactory).ls.Addr().Network()) + assert.Equal(t, "[::]:9111", f1.(*SocketFactory).ls.Addr().String()) cfg = &ServerConfig{Relay: "tcp://localhost:9112"} - f, err = cfg.makeFactory() + f, err := cfg.makeFactory() assert.NoError(t, err) assert.NotNil(t, f) defer func() { diff --git a/service/container.go b/service/container.go index b543ccb6..742b4c3b 100644 --- a/service/container.go +++ b/service/container.go @@ -46,6 +46,9 @@ type Container interface { // Close all active services. Stop() + + // List service names. + List() []string } // Config provides ability to slice configuration sections and unmarshal configuration data into @@ -212,6 +215,16 @@ func (c *container) Stop() { } } +// List all service names. +func (c *container) List() []string { + names := make([]string, 0, len(c.services)) + for _, e := range c.services { + names = append(names, e.name) + } + + return names +} + // calls Init method with automatically resolved arguments. func (c *container) initService(s interface{}, segment Config) (bool, error) { r := reflect.TypeOf(s) diff --git a/service/container_test.go b/service/container_test.go index 33ad9491..5350de41 100644 --- a/service/container_test.go +++ b/service/container_test.go @@ -132,6 +132,20 @@ func TestContainer_Has(t *testing.T) { assert.False(t, c.Has("another")) } +func TestContainer_List(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testService{}) + + assert.Equal(t, 0, len(hook.Entries)) + assert.Equal(t, 1, len(c.List())) + + assert.True(t, c.Has("test")) + assert.False(t, c.Has("another")) +} + func TestContainer_Get(t *testing.T) { logger, hook := test.NewNullLogger() logger.SetLevel(logrus.DebugLevel) @@ -428,6 +442,10 @@ func TestContainer_InitErrorB(t *testing.T) { type testInitC struct{} +func (r *testInitC) Test() bool { + return true +} + func TestContainer_NoInit(t *testing.T) { logger, _ := test.NewNullLogger() logger.SetLevel(logrus.DebugLevel) @@ -449,7 +467,6 @@ func (c *DCfg) Hydrate(cfg Config) error { if err := cfg.Unmarshal(c); err != nil { return err } - if c.V == "fail" { return errors.New("failed config") } diff --git a/service/health/service.go b/service/health/service.go index a730de7e..c82f43b5 100644 --- a/service/health/service.go +++ b/service/health/service.go @@ -3,6 +3,7 @@ package health import ( "context" "fmt" + "github.com/sirupsen/logrus" "net/http" "sync" @@ -15,19 +16,21 @@ const ID = "health" // Service to serve an endpoint for checking the health of the worker pool type Service struct { cfg *Config + log *logrus.Logger mu sync.Mutex http *http.Server httpService *rrhttp.Service } // Init health service -func (s *Service) Init(cfg *Config, r *rrhttp.Service) (bool, error) { +func (s *Service) Init(cfg *Config, r *rrhttp.Service, log *logrus.Logger) (bool, error) { // Ensure the httpService is set if r == nil { return false, nil } s.cfg = cfg + s.log = log s.httpService = r return true, nil } @@ -38,7 +41,13 @@ func (s *Service) Serve() error { s.mu.Lock() s.http = &http.Server{Addr: s.cfg.Address, Handler: s} s.mu.Unlock() - return s.http.ListenAndServe() + + err := s.http.ListenAndServe() + if err == nil || err == http.ErrServerClosed { + return nil + } + + return err } // Stop the health endpoint @@ -50,9 +59,8 @@ func (s *Service) Stop() { // gracefully stop the server go func() { err := s.http.Shutdown(context.Background()) - if err != nil { - // TODO how to log error here? - fmt.Println(fmt.Errorf("error shutting down the server: error %v", err)) + if err != nil && err != http.ErrServerClosed { + s.log.Error(fmt.Errorf("error shutting down the metrics server: error %v", err)) } }() } diff --git a/service/http/handler.go b/service/http/handler.go index 4de33844..3c667035 100644 --- a/service/http/handler.go +++ b/service/http/handler.go @@ -2,6 +2,7 @@ package http import ( "github.com/pkg/errors" + "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" "net" "net/http" @@ -59,6 +60,7 @@ func (e *ResponseEvent) Elapsed() time.Duration { // parsed files and query, payload will include parsed form dataTree (if any). type Handler struct { cfg *Config + log *logrus.Logger rr *roadrunner.Server mul sync.Mutex lsn func(event int, ctx interface{}) @@ -98,8 +100,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // proxy IP resolution h.resolveIP(req) - req.Open() - defer req.Close() + req.Open(h.log) + defer req.Close(h.log) p, err := req.Payload() if err != nil { diff --git a/service/http/handler_test.go b/service/http/handler_test.go index 0db999c9..994a663c 100644 --- a/service/http/handler_test.go +++ b/service/http/handler_test.go @@ -96,7 +96,7 @@ func TestHandler_Echo(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -197,11 +197,11 @@ func TestHandler_Headers(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 100) req, err := http.NewRequest("GET", "http://localhost:8078?hello=world", nil) assert.NoError(t, err) @@ -260,7 +260,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -322,7 +322,7 @@ func TestHandler_User_Agent(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -384,7 +384,7 @@ func TestHandler_Cookies(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -451,7 +451,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -517,7 +517,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -534,7 +534,6 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { err := r.Body.Close() if err != nil { t.Errorf("error during the closing Body: error %v", err) - } }() @@ -579,7 +578,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -641,7 +640,7 @@ func TestHandler_FormData_POST(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -715,7 +714,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -783,7 +782,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -857,7 +856,7 @@ func TestHandler_FormData_PUT(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -931,7 +930,7 @@ func TestHandler_FormData_PATCH(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1005,7 +1004,7 @@ func TestHandler_Multipart_POST(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1044,6 +1043,7 @@ func TestHandler_Multipart_POST(t *testing.T) { } err = w.WriteField("arr[x][y][e]", "f") + if err != nil { t.Errorf("error writing the field: error %v", err) } @@ -1120,7 +1120,7 @@ func TestHandler_Multipart_PUT(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1139,6 +1139,7 @@ func TestHandler_Multipart_PUT(t *testing.T) { } err = w.WriteField("name[]", "name1") + if err != nil { t.Errorf("error writing the field: error %v", err) } @@ -1235,7 +1236,8 @@ func TestHandler_Multipart_PATCH(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1259,11 +1261,13 @@ func TestHandler_Multipart_PATCH(t *testing.T) { } err = w.WriteField("name[]", "name2") + if err != nil { t.Errorf("error writing the field: error %v", err) } err = w.WriteField("name[]", "name3") + if err != nil { t.Errorf("error writing the field: error %v", err) } @@ -1350,7 +1354,7 @@ func TestHandler_Error(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1394,7 +1398,7 @@ func TestHandler_Error2(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1438,7 +1442,7 @@ func TestHandler_Error3(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1499,7 +1503,7 @@ func TestHandler_ResponseDuration(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1558,7 +1562,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1617,7 +1621,7 @@ func TestHandler_ErrorDuration(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1689,7 +1693,7 @@ func TestHandler_IP(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1748,7 +1752,7 @@ func TestHandler_XRealIP(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1811,7 +1815,7 @@ func TestHandler_XForwardedFor(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1874,7 +1878,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -1925,7 +1929,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { b.Errorf("error listening the interface: error %v", err) } }() diff --git a/service/http/request.go b/service/http/request.go index 98508342..5d91bfb6 100644 --- a/service/http/request.go +++ b/service/http/request.go @@ -3,6 +3,7 @@ package http import ( "encoding/json" "fmt" + "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service/http/attributes" "io/ioutil" @@ -112,21 +113,21 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { } // Open moves all uploaded files to temporary directory so it can be given to php later. -func (r *Request) Open() { +func (r *Request) Open(log *logrus.Logger) { if r.Uploads == nil { return } - r.Uploads.Open() + r.Uploads.Open(log) } // Close clears all temp file uploads -func (r *Request) Close() { +func (r *Request) Close(log *logrus.Logger) { if r.Uploads == nil { return } - r.Uploads.Clear() + r.Uploads.Clear(log) } // Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open diff --git a/service/http/service.go b/service/http/service.go index 1547538b..abe7b3a7 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -3,6 +3,7 @@ package http import ( "context" "fmt" + "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/service/env" "github.com/spiral/roadrunner/service/http/attributes" @@ -31,6 +32,8 @@ type middleware func(f http.HandlerFunc) http.HandlerFunc // Service manages rr, http servers. type Service struct { cfg *Config + log *logrus.Logger + cprod roadrunner.CommandProducer env env.Environment lsns []func(event int, ctx interface{}) mdwr []middleware @@ -48,6 +51,11 @@ func (s *Service) Attach(w roadrunner.Controller) { s.controller = w } +// ProduceCommands changes the default command generator method +func (s *Service) ProduceCommands(producer roadrunner.CommandProducer) { + s.cprod = producer +} + // AddMiddleware adds new net/http mdwr. func (s *Service) AddMiddleware(m middleware) { s.mdwr = append(s.mdwr, m) @@ -60,8 +68,9 @@ func (s *Service) AddListener(l func(event int, ctx interface{})) { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment) (bool, error) { +func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment, log *logrus.Logger) (bool, error) { s.cfg = cfg + s.log = log s.env = e if r != nil { @@ -87,6 +96,7 @@ func (s *Service) Serve() error { } } + s.cfg.Workers.CommandProducer = s.cprod s.cfg.Workers.SetEnv("RR_HTTP", "true") s.rr = roadrunner.NewServer(s.cfg.Workers) @@ -132,19 +142,34 @@ func (s *Service) Serve() error { if s.http != nil { go func() { - err <- s.http.ListenAndServe() + httpErr := s.http.ListenAndServe() + if httpErr != nil && httpErr != http.ErrServerClosed { + err <- httpErr + } else { + err <- nil + } }() } if s.https != nil { go func() { - err <- s.https.ListenAndServeTLS(s.cfg.SSL.Cert, s.cfg.SSL.Key) + httpErr := s.https.ListenAndServeTLS(s.cfg.SSL.Cert, s.cfg.SSL.Key) + if httpErr != nil && httpErr != http.ErrServerClosed { + err <- httpErr + } else { + err <- nil + } }() } if s.fcgi != nil { go func() { - err <- s.serveFCGI() + httpErr := s.serveFCGI() + if httpErr != nil && httpErr != http.ErrServerClosed { + err <- httpErr + } else { + err <- nil + } }() } @@ -159,11 +184,10 @@ func (s *Service) Stop() { if s.fcgi != nil { go func() { err := s.fcgi.Shutdown(context.Background()) - if err != nil { - // TODO think about returning error from this Stop function + if err != nil && err != http.ErrServerClosed { // Stop() error // push error from goroutines to the channel and block unil error or success shutdown or timeout - fmt.Println(fmt.Errorf("error shutting down the server, error: %v", err)) + s.log.Error(fmt.Errorf("error shutting down the fcgi server, error: %v", err)) return } }() @@ -171,9 +195,9 @@ func (s *Service) Stop() { if s.https != nil { go func() { - err := s.fcgi.Shutdown(context.Background()) - if err != nil { - fmt.Println(fmt.Errorf("error shutting down the server, error: %v", err)) + err := s.https.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error(fmt.Errorf("error shutting down the https server, error: %v", err)) return } }() @@ -181,9 +205,9 @@ func (s *Service) Stop() { if s.http != nil { go func() { - err := s.fcgi.Shutdown(context.Background()) - if err != nil { - fmt.Println(fmt.Errorf("error shutting down the server, error: %v", err)) + err := s.http.Shutdown(context.Background()) + if err != nil && err != http.ErrServerClosed { + s.log.Error(fmt.Errorf("error shutting down the http server, error: %v", err)) return } }() diff --git a/service/http/service_test.go b/service/http/service_test.go index bfc10971..c4b2c2c4 100644 --- a/service/http/service_test.go +++ b/service/http/service_test.go @@ -54,9 +54,7 @@ func Test_Service_NoConfig(t *testing.T) { c.Register(ID, &Service{}) err := c.Init(&testCfg{httpCfg: `{"Enable":true}`}) - if err != nil { - t.Errorf("error during the Init: error %v", err) - } + assert.Error(t, err) s, st := c.Get(ID) assert.NotNil(t, s) diff --git a/service/http/uploads.go b/service/http/uploads.go index 7efa7e4a..8a46f230 100644 --- a/service/http/uploads.go +++ b/service/http/uploads.go @@ -3,6 +3,7 @@ package http import ( "encoding/json" "fmt" + "github.com/sirupsen/logrus" "io" "io/ioutil" "mime/multipart" @@ -46,16 +47,15 @@ func (u *Uploads) MarshalJSON() ([]byte, error) { // Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors // will be handled individually. -func (u *Uploads) Open() { +func (u *Uploads) Open(log *logrus.Logger) { var wg sync.WaitGroup for _, f := range u.list { wg.Add(1) go func(f *FileUpload) { defer wg.Done() err := f.Open(u.cfg) - if err != nil { - // TODO handle error mechanism in goroutines - fmt.Println(fmt.Errorf("error opening the file: error %v", err)) + if err != nil && log != nil { + log.Error(fmt.Errorf("error opening the file: error %v", err)) } }(f) } @@ -64,13 +64,12 @@ func (u *Uploads) Open() { } // Clear deletes all temporary files. -func (u *Uploads) Clear() { +func (u *Uploads) Clear(log *logrus.Logger) { for _, f := range u.list { if f.TempFilename != "" && exists(f.TempFilename) { err := os.Remove(f.TempFilename) - if err != nil { - // TODO error handling mechanism - fmt.Println(fmt.Errorf("error removing the file: error %v", err)) + if err != nil && log != nil { + log.Error(fmt.Errorf("error removing the file: error %v", err)) } } } @@ -131,7 +130,6 @@ func (f *FileUpload) Open(cfg *UploadsConfig) (err error) { err = file.Close() }() - tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload") if err != nil { // most likely cause of this issue is missing tmp dir diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go index c5de224b..1890c02b 100644 --- a/service/http/uploads_test.go +++ b/service/http/uploads_test.go @@ -51,7 +51,7 @@ func TestHandler_Upload_File(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -138,7 +138,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -225,7 +225,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() @@ -312,7 +312,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) { go func() { err := hs.ListenAndServe() - if err != nil { + if err != nil && err != http.ErrServerClosed { t.Errorf("error listening the interface: error %v", err) } }() diff --git a/service/metrics/rpc_test.go b/service/metrics/rpc_test.go index feae927a..2468c083 100644 --- a/service/metrics/rpc_test.go +++ b/service/metrics/rpc_test.go @@ -48,10 +48,13 @@ func setup(t *testing.T, metric string, portNum string) (*rpc2.Client, service.C t.Errorf("error during the Serve: error %v", err) } }() - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 200) client, err := rs.Client() assert.NoError(t, err) + if err != nil { + panic(err) + } return client, c } diff --git a/service/metrics/service.go b/service/metrics/service.go index 9e2a1a71..6fa4da50 100644 --- a/service/metrics/service.go +++ b/service/metrics/service.go @@ -1,10 +1,13 @@ package metrics +// todo: declare metric at runtime + import ( "context" "fmt" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" "github.com/spiral/roadrunner/service/rpc" "net/http" "sync" @@ -16,6 +19,7 @@ const ID = "metrics" // Service to manage application metrics using Prometheus. type Service struct { cfg *Config + log *logrus.Logger mu sync.Mutex http *http.Server collectors sync.Map @@ -23,8 +27,9 @@ type Service struct { } // Init service. -func (s *Service) Init(cfg *Config, r *rpc.Service) (bool, error) { +func (s *Service) Init(cfg *Config, r *rpc.Service, log *logrus.Logger) (bool, error) { s.cfg = cfg + s.log = log s.registry = prometheus.NewRegistry() s.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) @@ -77,7 +82,12 @@ func (s *Service) Serve() error { )} s.mu.Unlock() - return s.http.ListenAndServe() + err = s.http.ListenAndServe() + if err == nil || err == http.ErrServerClosed { + return nil + } + + return err } // Stop prometheus metrics service. @@ -90,9 +100,8 @@ func (s *Service) Stop() { go func() { err := s.http.Shutdown(context.Background()) if err != nil { - // TODO how to show error message? // Function should be Stop() error - fmt.Println(fmt.Errorf("error shutting down the server: error %v", err)) + s.log.Error(fmt.Errorf("error shutting down the metrics server: error %v", err)) } }() } diff --git a/socket_factory_test.go b/socket_factory_test.go index e718f6c2..9f74cf8c 100644 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -191,11 +191,10 @@ func Test_Tcp_Broken(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "undefined_function()") }() + defer func() { err = w.Stop() - if err != nil { - t.Errorf("error stopping the worker: error %v", err) - } + assert.Error(t, err) }() res, err := w.Exec(&Payload{Body: []byte("hello")}) diff --git a/static_pool_test.go b/static_pool_test.go index f8ad4a4d..1f185f58 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -6,6 +6,7 @@ import ( "os/exec" "runtime" "strconv" + "strings" "sync" "testing" "time" @@ -154,9 +155,12 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) + done := make(chan interface{}) p.Listen(func(e int, ctx interface{}) { if err, ok := ctx.(error); ok { - assert.Contains(t, err.Error(), "undefined_function()") + if strings.Contains(err.Error(), "undefined_function()") { + close(done) + } } }) @@ -164,6 +168,8 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) + + <-done } func Test_StaticPool_Broken_FromOutside(t *testing.T) { @@ -195,12 +201,10 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { }) // killing random worker and expecting pool to replace it - p.muw.Lock() err = p.Workers()[0].cmd.Process.Kill() if err != nil { t.Errorf("error killing the process: error %v", err) } - p.muw.Unlock() <-destructed for _, w := range p.Workers() { @@ -106,14 +106,8 @@ func (w *Worker) Wait() error { if runtime.GOOS != "windows" { // windows handles processes and close pipes differently, // we can ignore wait here as process.Wait() already being handled above - var ws syscall.WaitStatus - _, err := syscall.Wait4(w.cmd.Process.Pid, &ws, syscall.WALL, nil) - if err != nil { - if ws.Exited() { - return nil - } else { - return err - } + if err := w.cmd.Wait(); err != nil { + return err } } diff --git a/worker_test.go b/worker_test.go index 815d60c2..e8cbef90 100644 --- a/worker_test.go +++ b/worker_test.go @@ -166,11 +166,10 @@ func Test_Broken(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "undefined_function()") }() + defer func() { err := w.Stop() - if err != nil { - t.Errorf("error stopping the worker: error %v", err) - } + assert.Error(t, err) }() res, err := w.Exec(&Payload{Body: []byte("hello")}) @@ -196,6 +195,7 @@ func Test_Error(t *testing.T) { go func() { assert.NoError(t, w.Wait()) }() + defer func() { err := w.Stop() if err != nil { |