summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Titov <[email protected]>2019-12-23 14:52:06 +0300
committerGitHub <[email protected]>2019-12-23 14:52:06 +0300
commit921e1f55e23ab75b8250045916c8d1ffad1b8bde (patch)
tree00b16331b9ff3b3b846ba22989dddde721cc959d
parent921354df1aa4687837e3ba6ac0eb04d39321c149 (diff)
parent2093cb9058f94668fff0a97beb76b0cab66c7b63 (diff)
Merge branch 'master' into Fix_warning_and_issues
-rw-r--r--CHANGELOG.md5
-rwxr-xr-xbuild.sh2
-rw-r--r--pipe_factory_test.go4
-rw-r--r--server_config.go40
-rw-r--r--server_config_test.go15
-rw-r--r--service/container.go13
-rw-r--r--service/container_test.go19
-rw-r--r--service/health/service.go18
-rw-r--r--service/http/handler.go6
-rw-r--r--service/http/handler_test.go62
-rw-r--r--service/http/request.go9
-rw-r--r--service/http/service.go50
-rw-r--r--service/http/service_test.go4
-rw-r--r--service/http/uploads.go16
-rw-r--r--service/http/uploads_test.go8
-rw-r--r--service/metrics/rpc_test.go5
-rw-r--r--service/metrics/service.go17
-rw-r--r--socket_factory_test.go5
-rw-r--r--static_pool_test.go10
-rw-r--r--worker.go10
-rw-r--r--worker_test.go6
21 files changed, 217 insertions, 107 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a40a7096..12924618 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,11 @@
CHANGELOG
=========
+v1.5.2 (05.12.2019)
+-------------------
+- added support for symfony/console 5.0 by @coxa
+- added support for HTTP2 trailers by @filakhtov
+
v1.5.1 (22.10.2019)
-------------------
- bugfix: do not halt stop sequence in case of service error
diff --git a/build.sh b/build.sh
index a071c7ad..128b2f09 100755
--- a/build.sh
+++ b/build.sh
@@ -3,7 +3,7 @@ cd $(dirname "${BASH_SOURCE[0]}")
OD="$(pwd)"
# Pushes application version into the build information.
-RR_VERSION=1.5.1
+RR_VERSION=1.5.2
# Hardcode some values to the core package
LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}"
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
index 63cee6d4..27d1f74d 100644
--- a/pipe_factory_test.go
+++ b/pipe_factory_test.go
@@ -109,9 +109,7 @@ func Test_Pipe_Broken(t *testing.T) {
}()
defer func() {
err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
- }
+ assert.Error(t, err)
}()
res, err := w.Exec(&Payload{Body: []byte("hello")})
diff --git a/server_config.go b/server_config.go
index 8c96aaa8..641c1866 100644
--- a/server_config.go
+++ b/server_config.go
@@ -8,15 +8,22 @@ import (
"os"
"os/exec"
"strings"
+ "sync"
"syscall"
"time"
)
+// CommandProducer can produce commands.
+type CommandProducer func(cfg *ServerConfig) func() *exec.Cmd
+
// ServerConfig config combines factory, pool and cmd configurations.
type ServerConfig struct {
// Command includes command strings with all the parameters, example: "php worker.php pipes".
Command string
+ // CommandProducer overwrites
+ CommandProducer CommandProducer
+
// Relay defines connection method and factory to be used to connect to workers:
// "pipes", "tcp://:6001", "unix://rr.sock"
// This config section must not change on re-configuration.
@@ -31,7 +38,8 @@ type ServerConfig struct {
Pool *Config
// values defines set of values to be passed to the command context.
- env []string
+ mu sync.Mutex
+ env map[string]string
}
// InitDefaults sets missing values to their default values.
@@ -68,18 +76,42 @@ func (cfg *ServerConfig) Differs(new *ServerConfig) bool {
// SetEnv sets new environment variable. Value is automatically uppercase-d.
func (cfg *ServerConfig) SetEnv(k, v string) {
- cfg.env = append(cfg.env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
+ cfg.mu.Lock()
+ defer cfg.mu.Unlock()
+
+ if cfg.env == nil {
+ cfg.env = make(map[string]string)
+ }
+
+ cfg.env[k] = v
+}
+
+// GetEnv must return list of env variables.
+func (cfg *ServerConfig) GetEnv() (env []string) {
+ env = append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", cfg.Relay))
+ for k, v := range cfg.env {
+ env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
+ }
+
+ return
}
// makeCommands returns new command provider based on configured options.
func (cfg *ServerConfig) makeCommand() func() *exec.Cmd {
+ cfg.mu.Lock()
+ defer cfg.mu.Unlock()
+
+ if cfg.CommandProducer != nil {
+ return cfg.CommandProducer(cfg)
+ }
+
var cmd = strings.Split(cfg.Command, " ")
return func() *exec.Cmd {
cmd := exec.Command(cmd[0], cmd[1:]...)
osutil.IsolateProcess(cmd)
- cmd.Env = append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", cfg.Relay))
- cmd.Env = append(cmd.Env, cfg.env...)
+ cmd.Env = cfg.GetEnv()
+
return cmd
}
}
diff --git a/server_config_test.go b/server_config_test.go
index e408bde1..4f26d6ab 100644
--- a/server_config_test.go
+++ b/server_config_test.go
@@ -31,23 +31,24 @@ func Test_ServerConfig_PipeFactory(t *testing.T) {
func Test_ServerConfig_SocketFactory(t *testing.T) {
cfg := &ServerConfig{Relay: "tcp://:9111"}
- f, err := cfg.makeFactory()
+ f1, err := cfg.makeFactory()
assert.NoError(t, err)
- assert.NotNil(t, f)
+ assert.NotNil(t, f1)
defer func() {
- err := f.Close()
+ err := f1.Close()
+
if err != nil {
t.Errorf("error closing factory or underlying connections: error %v", err)
}
}()
assert.NoError(t, err)
- assert.IsType(t, &SocketFactory{}, f)
- assert.Equal(t, "tcp", f.(*SocketFactory).ls.Addr().Network())
- assert.Equal(t, "[::]:9111", f.(*SocketFactory).ls.Addr().String())
+ assert.IsType(t, &SocketFactory{}, f1)
+ assert.Equal(t, "tcp", f1.(*SocketFactory).ls.Addr().Network())
+ assert.Equal(t, "[::]:9111", f1.(*SocketFactory).ls.Addr().String())
cfg = &ServerConfig{Relay: "tcp://localhost:9112"}
- f, err = cfg.makeFactory()
+ f, err := cfg.makeFactory()
assert.NoError(t, err)
assert.NotNil(t, f)
defer func() {
diff --git a/service/container.go b/service/container.go
index b543ccb6..742b4c3b 100644
--- a/service/container.go
+++ b/service/container.go
@@ -46,6 +46,9 @@ type Container interface {
// Close all active services.
Stop()
+
+ // List service names.
+ List() []string
}
// Config provides ability to slice configuration sections and unmarshal configuration data into
@@ -212,6 +215,16 @@ func (c *container) Stop() {
}
}
+// List all service names.
+func (c *container) List() []string {
+ names := make([]string, 0, len(c.services))
+ for _, e := range c.services {
+ names = append(names, e.name)
+ }
+
+ return names
+}
+
// calls Init method with automatically resolved arguments.
func (c *container) initService(s interface{}, segment Config) (bool, error) {
r := reflect.TypeOf(s)
diff --git a/service/container_test.go b/service/container_test.go
index 33ad9491..5350de41 100644
--- a/service/container_test.go
+++ b/service/container_test.go
@@ -132,6 +132,20 @@ func TestContainer_Has(t *testing.T) {
assert.False(t, c.Has("another"))
}
+func TestContainer_List(t *testing.T) {
+ logger, hook := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testService{})
+
+ assert.Equal(t, 0, len(hook.Entries))
+ assert.Equal(t, 1, len(c.List()))
+
+ assert.True(t, c.Has("test"))
+ assert.False(t, c.Has("another"))
+}
+
func TestContainer_Get(t *testing.T) {
logger, hook := test.NewNullLogger()
logger.SetLevel(logrus.DebugLevel)
@@ -428,6 +442,10 @@ func TestContainer_InitErrorB(t *testing.T) {
type testInitC struct{}
+func (r *testInitC) Test() bool {
+ return true
+}
+
func TestContainer_NoInit(t *testing.T) {
logger, _ := test.NewNullLogger()
logger.SetLevel(logrus.DebugLevel)
@@ -449,7 +467,6 @@ func (c *DCfg) Hydrate(cfg Config) error {
if err := cfg.Unmarshal(c); err != nil {
return err
}
-
if c.V == "fail" {
return errors.New("failed config")
}
diff --git a/service/health/service.go b/service/health/service.go
index a730de7e..c82f43b5 100644
--- a/service/health/service.go
+++ b/service/health/service.go
@@ -3,6 +3,7 @@ package health
import (
"context"
"fmt"
+ "github.com/sirupsen/logrus"
"net/http"
"sync"
@@ -15,19 +16,21 @@ const ID = "health"
// Service to serve an endpoint for checking the health of the worker pool
type Service struct {
cfg *Config
+ log *logrus.Logger
mu sync.Mutex
http *http.Server
httpService *rrhttp.Service
}
// Init health service
-func (s *Service) Init(cfg *Config, r *rrhttp.Service) (bool, error) {
+func (s *Service) Init(cfg *Config, r *rrhttp.Service, log *logrus.Logger) (bool, error) {
// Ensure the httpService is set
if r == nil {
return false, nil
}
s.cfg = cfg
+ s.log = log
s.httpService = r
return true, nil
}
@@ -38,7 +41,13 @@ func (s *Service) Serve() error {
s.mu.Lock()
s.http = &http.Server{Addr: s.cfg.Address, Handler: s}
s.mu.Unlock()
- return s.http.ListenAndServe()
+
+ err := s.http.ListenAndServe()
+ if err == nil || err == http.ErrServerClosed {
+ return nil
+ }
+
+ return err
}
// Stop the health endpoint
@@ -50,9 +59,8 @@ func (s *Service) Stop() {
// gracefully stop the server
go func() {
err := s.http.Shutdown(context.Background())
- if err != nil {
- // TODO how to log error here?
- fmt.Println(fmt.Errorf("error shutting down the server: error %v", err))
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error(fmt.Errorf("error shutting down the metrics server: error %v", err))
}
}()
}
diff --git a/service/http/handler.go b/service/http/handler.go
index 4de33844..3c667035 100644
--- a/service/http/handler.go
+++ b/service/http/handler.go
@@ -2,6 +2,7 @@ package http
import (
"github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
"github.com/spiral/roadrunner"
"net"
"net/http"
@@ -59,6 +60,7 @@ func (e *ResponseEvent) Elapsed() time.Duration {
// parsed files and query, payload will include parsed form dataTree (if any).
type Handler struct {
cfg *Config
+ log *logrus.Logger
rr *roadrunner.Server
mul sync.Mutex
lsn func(event int, ctx interface{})
@@ -98,8 +100,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// proxy IP resolution
h.resolveIP(req)
- req.Open()
- defer req.Close()
+ req.Open(h.log)
+ defer req.Close(h.log)
p, err := req.Payload()
if err != nil {
diff --git a/service/http/handler_test.go b/service/http/handler_test.go
index 0db999c9..994a663c 100644
--- a/service/http/handler_test.go
+++ b/service/http/handler_test.go
@@ -96,7 +96,7 @@ func TestHandler_Echo(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -197,11 +197,11 @@ func TestHandler_Headers(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
- time.Sleep(time.Millisecond * 10)
+ time.Sleep(time.Millisecond * 100)
req, err := http.NewRequest("GET", "http://localhost:8078?hello=world", nil)
assert.NoError(t, err)
@@ -260,7 +260,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -322,7 +322,7 @@ func TestHandler_User_Agent(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -384,7 +384,7 @@ func TestHandler_Cookies(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -451,7 +451,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -517,7 +517,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -534,7 +534,6 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
err := r.Body.Close()
if err != nil {
t.Errorf("error during the closing Body: error %v", err)
-
}
}()
@@ -579,7 +578,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -641,7 +640,7 @@ func TestHandler_FormData_POST(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -715,7 +714,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -783,7 +782,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -857,7 +856,7 @@ func TestHandler_FormData_PUT(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -931,7 +930,7 @@ func TestHandler_FormData_PATCH(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1005,7 +1004,7 @@ func TestHandler_Multipart_POST(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1044,6 +1043,7 @@ func TestHandler_Multipart_POST(t *testing.T) {
}
err = w.WriteField("arr[x][y][e]", "f")
+
if err != nil {
t.Errorf("error writing the field: error %v", err)
}
@@ -1120,7 +1120,7 @@ func TestHandler_Multipart_PUT(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1139,6 +1139,7 @@ func TestHandler_Multipart_PUT(t *testing.T) {
}
err = w.WriteField("name[]", "name1")
+
if err != nil {
t.Errorf("error writing the field: error %v", err)
}
@@ -1235,7 +1236,8 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1259,11 +1261,13 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
}
err = w.WriteField("name[]", "name2")
+
if err != nil {
t.Errorf("error writing the field: error %v", err)
}
err = w.WriteField("name[]", "name3")
+
if err != nil {
t.Errorf("error writing the field: error %v", err)
}
@@ -1350,7 +1354,7 @@ func TestHandler_Error(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1394,7 +1398,7 @@ func TestHandler_Error2(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1438,7 +1442,7 @@ func TestHandler_Error3(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1499,7 +1503,7 @@ func TestHandler_ResponseDuration(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1558,7 +1562,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1617,7 +1621,7 @@ func TestHandler_ErrorDuration(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1689,7 +1693,7 @@ func TestHandler_IP(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1748,7 +1752,7 @@ func TestHandler_XRealIP(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1811,7 +1815,7 @@ func TestHandler_XForwardedFor(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1874,7 +1878,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -1925,7 +1929,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
b.Errorf("error listening the interface: error %v", err)
}
}()
diff --git a/service/http/request.go b/service/http/request.go
index 98508342..5d91bfb6 100644
--- a/service/http/request.go
+++ b/service/http/request.go
@@ -3,6 +3,7 @@ package http
import (
"encoding/json"
"fmt"
+ "github.com/sirupsen/logrus"
"github.com/spiral/roadrunner"
"github.com/spiral/roadrunner/service/http/attributes"
"io/ioutil"
@@ -112,21 +113,21 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) {
}
// Open moves all uploaded files to temporary directory so it can be given to php later.
-func (r *Request) Open() {
+func (r *Request) Open(log *logrus.Logger) {
if r.Uploads == nil {
return
}
- r.Uploads.Open()
+ r.Uploads.Open(log)
}
// Close clears all temp file uploads
-func (r *Request) Close() {
+func (r *Request) Close(log *logrus.Logger) {
if r.Uploads == nil {
return
}
- r.Uploads.Clear()
+ r.Uploads.Clear(log)
}
// Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open
diff --git a/service/http/service.go b/service/http/service.go
index 1547538b..abe7b3a7 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -3,6 +3,7 @@ package http
import (
"context"
"fmt"
+ "github.com/sirupsen/logrus"
"github.com/spiral/roadrunner"
"github.com/spiral/roadrunner/service/env"
"github.com/spiral/roadrunner/service/http/attributes"
@@ -31,6 +32,8 @@ type middleware func(f http.HandlerFunc) http.HandlerFunc
// Service manages rr, http servers.
type Service struct {
cfg *Config
+ log *logrus.Logger
+ cprod roadrunner.CommandProducer
env env.Environment
lsns []func(event int, ctx interface{})
mdwr []middleware
@@ -48,6 +51,11 @@ func (s *Service) Attach(w roadrunner.Controller) {
s.controller = w
}
+// ProduceCommands changes the default command generator method
+func (s *Service) ProduceCommands(producer roadrunner.CommandProducer) {
+ s.cprod = producer
+}
+
// AddMiddleware adds new net/http mdwr.
func (s *Service) AddMiddleware(m middleware) {
s.mdwr = append(s.mdwr, m)
@@ -60,8 +68,9 @@ func (s *Service) AddListener(l func(event int, ctx interface{})) {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment) (bool, error) {
+func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment, log *logrus.Logger) (bool, error) {
s.cfg = cfg
+ s.log = log
s.env = e
if r != nil {
@@ -87,6 +96,7 @@ func (s *Service) Serve() error {
}
}
+ s.cfg.Workers.CommandProducer = s.cprod
s.cfg.Workers.SetEnv("RR_HTTP", "true")
s.rr = roadrunner.NewServer(s.cfg.Workers)
@@ -132,19 +142,34 @@ func (s *Service) Serve() error {
if s.http != nil {
go func() {
- err <- s.http.ListenAndServe()
+ httpErr := s.http.ListenAndServe()
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ err <- httpErr
+ } else {
+ err <- nil
+ }
}()
}
if s.https != nil {
go func() {
- err <- s.https.ListenAndServeTLS(s.cfg.SSL.Cert, s.cfg.SSL.Key)
+ httpErr := s.https.ListenAndServeTLS(s.cfg.SSL.Cert, s.cfg.SSL.Key)
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ err <- httpErr
+ } else {
+ err <- nil
+ }
}()
}
if s.fcgi != nil {
go func() {
- err <- s.serveFCGI()
+ httpErr := s.serveFCGI()
+ if httpErr != nil && httpErr != http.ErrServerClosed {
+ err <- httpErr
+ } else {
+ err <- nil
+ }
}()
}
@@ -159,11 +184,10 @@ func (s *Service) Stop() {
if s.fcgi != nil {
go func() {
err := s.fcgi.Shutdown(context.Background())
- if err != nil {
- // TODO think about returning error from this Stop function
+ if err != nil && err != http.ErrServerClosed {
// Stop() error
// push error from goroutines to the channel and block unil error or success shutdown or timeout
- fmt.Println(fmt.Errorf("error shutting down the server, error: %v", err))
+ s.log.Error(fmt.Errorf("error shutting down the fcgi server, error: %v", err))
return
}
}()
@@ -171,9 +195,9 @@ func (s *Service) Stop() {
if s.https != nil {
go func() {
- err := s.fcgi.Shutdown(context.Background())
- if err != nil {
- fmt.Println(fmt.Errorf("error shutting down the server, error: %v", err))
+ err := s.https.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error(fmt.Errorf("error shutting down the https server, error: %v", err))
return
}
}()
@@ -181,9 +205,9 @@ func (s *Service) Stop() {
if s.http != nil {
go func() {
- err := s.fcgi.Shutdown(context.Background())
- if err != nil {
- fmt.Println(fmt.Errorf("error shutting down the server, error: %v", err))
+ err := s.http.Shutdown(context.Background())
+ if err != nil && err != http.ErrServerClosed {
+ s.log.Error(fmt.Errorf("error shutting down the http server, error: %v", err))
return
}
}()
diff --git a/service/http/service_test.go b/service/http/service_test.go
index bfc10971..c4b2c2c4 100644
--- a/service/http/service_test.go
+++ b/service/http/service_test.go
@@ -54,9 +54,7 @@ func Test_Service_NoConfig(t *testing.T) {
c.Register(ID, &Service{})
err := c.Init(&testCfg{httpCfg: `{"Enable":true}`})
- if err != nil {
- t.Errorf("error during the Init: error %v", err)
- }
+ assert.Error(t, err)
s, st := c.Get(ID)
assert.NotNil(t, s)
diff --git a/service/http/uploads.go b/service/http/uploads.go
index 7efa7e4a..8a46f230 100644
--- a/service/http/uploads.go
+++ b/service/http/uploads.go
@@ -3,6 +3,7 @@ package http
import (
"encoding/json"
"fmt"
+ "github.com/sirupsen/logrus"
"io"
"io/ioutil"
"mime/multipart"
@@ -46,16 +47,15 @@ func (u *Uploads) MarshalJSON() ([]byte, error) {
// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
// will be handled individually.
-func (u *Uploads) Open() {
+func (u *Uploads) Open(log *logrus.Logger) {
var wg sync.WaitGroup
for _, f := range u.list {
wg.Add(1)
go func(f *FileUpload) {
defer wg.Done()
err := f.Open(u.cfg)
- if err != nil {
- // TODO handle error mechanism in goroutines
- fmt.Println(fmt.Errorf("error opening the file: error %v", err))
+ if err != nil && log != nil {
+ log.Error(fmt.Errorf("error opening the file: error %v", err))
}
}(f)
}
@@ -64,13 +64,12 @@ func (u *Uploads) Open() {
}
// Clear deletes all temporary files.
-func (u *Uploads) Clear() {
+func (u *Uploads) Clear(log *logrus.Logger) {
for _, f := range u.list {
if f.TempFilename != "" && exists(f.TempFilename) {
err := os.Remove(f.TempFilename)
- if err != nil {
- // TODO error handling mechanism
- fmt.Println(fmt.Errorf("error removing the file: error %v", err))
+ if err != nil && log != nil {
+ log.Error(fmt.Errorf("error removing the file: error %v", err))
}
}
}
@@ -131,7 +130,6 @@ func (f *FileUpload) Open(cfg *UploadsConfig) (err error) {
err = file.Close()
}()
-
tmp, err := ioutil.TempFile(cfg.TmpDir(), "upload")
if err != nil {
// most likely cause of this issue is missing tmp dir
diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go
index c5de224b..1890c02b 100644
--- a/service/http/uploads_test.go
+++ b/service/http/uploads_test.go
@@ -51,7 +51,7 @@ func TestHandler_Upload_File(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -138,7 +138,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -225,7 +225,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
@@ -312,7 +312,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) {
go func() {
err := hs.ListenAndServe()
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
t.Errorf("error listening the interface: error %v", err)
}
}()
diff --git a/service/metrics/rpc_test.go b/service/metrics/rpc_test.go
index feae927a..2468c083 100644
--- a/service/metrics/rpc_test.go
+++ b/service/metrics/rpc_test.go
@@ -48,10 +48,13 @@ func setup(t *testing.T, metric string, portNum string) (*rpc2.Client, service.C
t.Errorf("error during the Serve: error %v", err)
}
}()
- time.Sleep(time.Millisecond * 100)
+ time.Sleep(time.Millisecond * 200)
client, err := rs.Client()
assert.NoError(t, err)
+ if err != nil {
+ panic(err)
+ }
return client, c
}
diff --git a/service/metrics/service.go b/service/metrics/service.go
index 9e2a1a71..6fa4da50 100644
--- a/service/metrics/service.go
+++ b/service/metrics/service.go
@@ -1,10 +1,13 @@
package metrics
+// todo: declare metric at runtime
+
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/sirupsen/logrus"
"github.com/spiral/roadrunner/service/rpc"
"net/http"
"sync"
@@ -16,6 +19,7 @@ const ID = "metrics"
// Service to manage application metrics using Prometheus.
type Service struct {
cfg *Config
+ log *logrus.Logger
mu sync.Mutex
http *http.Server
collectors sync.Map
@@ -23,8 +27,9 @@ type Service struct {
}
// Init service.
-func (s *Service) Init(cfg *Config, r *rpc.Service) (bool, error) {
+func (s *Service) Init(cfg *Config, r *rpc.Service, log *logrus.Logger) (bool, error) {
s.cfg = cfg
+ s.log = log
s.registry = prometheus.NewRegistry()
s.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
@@ -77,7 +82,12 @@ func (s *Service) Serve() error {
)}
s.mu.Unlock()
- return s.http.ListenAndServe()
+ err = s.http.ListenAndServe()
+ if err == nil || err == http.ErrServerClosed {
+ return nil
+ }
+
+ return err
}
// Stop prometheus metrics service.
@@ -90,9 +100,8 @@ func (s *Service) Stop() {
go func() {
err := s.http.Shutdown(context.Background())
if err != nil {
- // TODO how to show error message?
// Function should be Stop() error
- fmt.Println(fmt.Errorf("error shutting down the server: error %v", err))
+ s.log.Error(fmt.Errorf("error shutting down the metrics server: error %v", err))
}
}()
}
diff --git a/socket_factory_test.go b/socket_factory_test.go
index e718f6c2..9f74cf8c 100644
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -191,11 +191,10 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "undefined_function()")
}()
+
defer func() {
err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
- }
+ assert.Error(t, err)
}()
res, err := w.Exec(&Payload{Body: []byte("hello")})
diff --git a/static_pool_test.go b/static_pool_test.go
index f8ad4a4d..1f185f58 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -6,6 +6,7 @@ import (
"os/exec"
"runtime"
"strconv"
+ "strings"
"sync"
"testing"
"time"
@@ -154,9 +155,12 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.NotNil(t, p)
+ done := make(chan interface{})
p.Listen(func(e int, ctx interface{}) {
if err, ok := ctx.(error); ok {
- assert.Contains(t, err.Error(), "undefined_function()")
+ if strings.Contains(err.Error(), "undefined_function()") {
+ close(done)
+ }
}
})
@@ -164,6 +168,8 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
+
+ <-done
}
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
@@ -195,12 +201,10 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
})
// killing random worker and expecting pool to replace it
- p.muw.Lock()
err = p.Workers()[0].cmd.Process.Kill()
if err != nil {
t.Errorf("error killing the process: error %v", err)
}
- p.muw.Unlock()
<-destructed
for _, w := range p.Workers() {
diff --git a/worker.go b/worker.go
index a10d36d6..f31929bf 100644
--- a/worker.go
+++ b/worker.go
@@ -106,14 +106,8 @@ func (w *Worker) Wait() error {
if runtime.GOOS != "windows" {
// windows handles processes and close pipes differently,
// we can ignore wait here as process.Wait() already being handled above
- var ws syscall.WaitStatus
- _, err := syscall.Wait4(w.cmd.Process.Pid, &ws, syscall.WALL, nil)
- if err != nil {
- if ws.Exited() {
- return nil
- } else {
- return err
- }
+ if err := w.cmd.Wait(); err != nil {
+ return err
}
}
diff --git a/worker_test.go b/worker_test.go
index 815d60c2..e8cbef90 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -166,11 +166,10 @@ func Test_Broken(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "undefined_function()")
}()
+
defer func() {
err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
- }
+ assert.Error(t, err)
}()
res, err := w.Exec(&Payload{Body: []byte("hello")})
@@ -196,6 +195,7 @@ func Test_Error(t *testing.T) {
go func() {
assert.NoError(t, w.Wait())
}()
+
defer func() {
err := w.Stop()
if err != nil {