summaryrefslogtreecommitdiff
path: root/service
diff options
context:
space:
mode:
Diffstat (limited to 'service')
-rw-r--r--service/container.go6
-rw-r--r--service/env/service.go2
-rw-r--r--service/http/config.go4
-rw-r--r--service/http/config_test.go30
-rw-r--r--service/http/handler.go4
-rw-r--r--service/http/handler_test.go50
-rw-r--r--service/http/rpc.go4
-rw-r--r--service/http/rpc_test.go6
-rw-r--r--service/http/service.go23
-rw-r--r--service/http/service_test.go20
-rw-r--r--service/http/uploads_test.go8
-rw-r--r--service/rpc/service.go11
-rw-r--r--service/rpc/service_test.go13
-rw-r--r--service/rpc/system.go18
-rw-r--r--service/static/service.go2
-rw-r--r--service/static/service_test.go18
-rw-r--r--service/watcher/config.go48
-rw-r--r--service/watcher/service.go46
-rw-r--r--service/watcher/state_watch.go58
-rw-r--r--service/watcher/watcher.go153
20 files changed, 436 insertions, 88 deletions
diff --git a/service/container.go b/service/container.go
index 275cfffd..abeaf369 100644
--- a/service/container.go
+++ b/service/container.go
@@ -16,13 +16,13 @@ var errNoConfig = fmt.Errorf("no config has been provided")
// implement service.HydrateConfig.
const InitMethod = "Init"
-// Service can serve. Service can provide Init method which must return (bool, error) signature and might accept
+// Services can serve. Services can provide Init method which must return (bool, error) signature and might accept
// other services and/or configs as dependency.
type Service interface {
// Serve serves.
Serve() error
- // Stop stops the service.
+ // Detach stops the service.
Stop()
}
@@ -198,7 +198,7 @@ func (c *container) Serve() error {
return nil
}
-// Stop sends stop command to all running services.
+// Detach sends stop command to all running services.
func (c *container) Stop() {
for _, e := range c.services {
if e.hasStatus(StatusServing) {
diff --git a/service/env/service.go b/service/env/service.go
index 83175b36..00bc41ef 100644
--- a/service/env/service.go
+++ b/service/env/service.go
@@ -3,7 +3,7 @@ package env
// ID contains default service name.
const ID = "env"
-// Service provides ability to map _ENV values from config file.
+// Services provides ability to map _ENV values from config file.
type Service struct {
// values is default set of values.
values map[string]string
diff --git a/service/http/config.go b/service/http/config.go
index 5a2c8768..899a5083 100644
--- a/service/http/config.go
+++ b/service/http/config.go
@@ -17,8 +17,8 @@ type Config struct {
// SSL defines https server options.
SSL SSLConfig
- // MaxRequest specified max size for payload body in megabytes, set 0 to unlimited.
- MaxRequest int64
+ // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited.
+ MaxRequestSize int64
// Uploads configures uploads configuration.
Uploads *UploadsConfig
diff --git a/service/http/config_test.go b/service/http/config_test.go
index 07901cb6..4cd2783f 100644
--- a/service/http/config_test.go
+++ b/service/http/config_test.go
@@ -31,8 +31,8 @@ func Test_Config_Hydrate_Error2(t *testing.T) {
func Test_Config_Valid(t *testing.T) {
cfg := &Config{
- Address: ":8080",
- MaxRequest: 1024,
+ Address: ":8080",
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -58,7 +58,7 @@ func Test_Config_Valid_SSL(t *testing.T) {
Cert: "fixtures/server.crt",
Key: "fixtures/server.key",
},
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -87,7 +87,7 @@ func Test_Config_SSL_No_key(t *testing.T) {
SSL: SSLConfig{
Cert: "fixtures/server.crt",
},
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -112,7 +112,7 @@ func Test_Config_SSL_No_Cert(t *testing.T) {
SSL: SSLConfig{
Key: "fixtures/server.key",
},
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -133,8 +133,8 @@ func Test_Config_SSL_No_Cert(t *testing.T) {
func Test_Config_NoUploads(t *testing.T) {
cfg := &Config{
- Address: ":8080",
- MaxRequest: 1024,
+ Address: ":8080",
+ MaxRequestSize: 1024,
Workers: &roadrunner.ServerConfig{
Command: "php tests/client.php echo pipes",
Relay: "pipes",
@@ -151,8 +151,8 @@ func Test_Config_NoUploads(t *testing.T) {
func Test_Config_NoWorkers(t *testing.T) {
cfg := &Config{
- Address: ":8080",
- MaxRequest: 1024,
+ Address: ":8080",
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -164,8 +164,8 @@ func Test_Config_NoWorkers(t *testing.T) {
func Test_Config_NoPool(t *testing.T) {
cfg := &Config{
- Address: ":8080",
- MaxRequest: 1024,
+ Address: ":8080",
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -186,8 +186,8 @@ func Test_Config_NoPool(t *testing.T) {
func Test_Config_DeadPool(t *testing.T) {
cfg := &Config{
- Address: ":8080",
- MaxRequest: 1024,
+ Address: ":8080",
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
@@ -203,8 +203,8 @@ func Test_Config_DeadPool(t *testing.T) {
func Test_Config_InvalidAddress(t *testing.T) {
cfg := &Config{
- Address: "",
- MaxRequest: 1024,
+ Address: "",
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
diff --git a/service/http/handler.go b/service/http/handler.go
index 8cebc42a..a7a6d4d0 100644
--- a/service/http/handler.go
+++ b/service/http/handler.go
@@ -75,12 +75,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// validating request size
- if h.cfg.MaxRequest != 0 {
+ if h.cfg.MaxRequestSize != 0 {
if length := r.Header.Get("content-length"); length != "" {
if size, err := strconv.ParseInt(length, 10, 64); err != nil {
h.handleError(w, r, err, start)
return
- } else if size > h.cfg.MaxRequest*1024*1024 {
+ } else if size > h.cfg.MaxRequestSize*1024*1024 {
h.handleError(w, r, errors.New("request body max size is exceeded"), start)
return
}
diff --git a/service/http/handler_test.go b/service/http/handler_test.go
index d876ef8e..5d4f7659 100644
--- a/service/http/handler_test.go
+++ b/service/http/handler_test.go
@@ -32,7 +32,7 @@ func get(url string) (string, *http.Response, error) {
func TestHandler_Echo(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -67,7 +67,7 @@ func TestHandler_Echo(t *testing.T) {
func Test_HandlerErrors(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -94,7 +94,7 @@ func Test_HandlerErrors(t *testing.T) {
func Test_Handler_JSON_error(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -123,7 +123,7 @@ func Test_Handler_JSON_error(t *testing.T) {
func TestHandler_Headers(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -170,7 +170,7 @@ func TestHandler_Headers(t *testing.T) {
func TestHandler_Empty_User_Agent(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -216,7 +216,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
func TestHandler_User_Agent(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -262,7 +262,7 @@ func TestHandler_User_Agent(t *testing.T) {
func TestHandler_Cookies(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -313,7 +313,7 @@ func TestHandler_Cookies(t *testing.T) {
func TestHandler_JsonPayload_POST(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -363,7 +363,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
func TestHandler_JsonPayload_PUT(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -409,7 +409,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
func TestHandler_JsonPayload_PATCH(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -455,7 +455,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
func TestHandler_FormData_POST(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -513,7 +513,7 @@ func TestHandler_FormData_POST(t *testing.T) {
func TestHandler_FormData_POST_Overwrite(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -565,7 +565,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -623,7 +623,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
func TestHandler_FormData_PUT(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -681,7 +681,7 @@ func TestHandler_FormData_PUT(t *testing.T) {
func TestHandler_FormData_PATCH(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -739,7 +739,7 @@ func TestHandler_FormData_PATCH(t *testing.T) {
func TestHandler_Multipart_POST(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -801,7 +801,7 @@ func TestHandler_Multipart_POST(t *testing.T) {
func TestHandler_Multipart_PUT(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -863,7 +863,7 @@ func TestHandler_Multipart_PUT(t *testing.T) {
func TestHandler_Multipart_PATCH(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -925,7 +925,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
func TestHandler_Error(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -959,7 +959,7 @@ func TestHandler_Error(t *testing.T) {
func TestHandler_Error2(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -993,7 +993,7 @@ func TestHandler_Error2(t *testing.T) {
func TestHandler_Error3(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1,
+ MaxRequestSize: 1,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -1038,7 +1038,7 @@ func TestHandler_Error3(t *testing.T) {
func TestHandler_ResponseDuration(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -1087,7 +1087,7 @@ func TestHandler_ResponseDuration(t *testing.T) {
func TestHandler_ResponseDurationDelayed(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -1136,7 +1136,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
func TestHandler_ErrorDuration(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -1184,7 +1184,7 @@ func TestHandler_ErrorDuration(t *testing.T) {
func BenchmarkHandler_Listen_Echo(b *testing.B) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
diff --git a/service/http/rpc.go b/service/http/rpc.go
index 3390a93d..7b38dece 100644
--- a/service/http/rpc.go
+++ b/service/http/rpc.go
@@ -20,7 +20,7 @@ func (rpc *rpcServer) Reset(reset bool, r *string) error {
}
*r = "OK"
- return rpc.svc.rr.Reset()
+ return rpc.svc.Server().Reset()
}
// Workers returns list of active workers and their stats.
@@ -29,6 +29,6 @@ func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) {
return errors.New("http server is not running")
}
- r.Workers, err = util.ServerState(rpc.svc.rr)
+ r.Workers, err = util.ServerState(rpc.svc.Server())
return err
}
diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go
index ba3efd2e..669b201c 100644
--- a/service/http/rpc_test.go
+++ b/service/http/rpc_test.go
@@ -27,7 +27,7 @@ func Test_RPC(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -88,7 +88,7 @@ func Test_RPC_Unix(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -142,7 +142,7 @@ func Test_Workers(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
diff --git a/service/http/service.go b/service/http/service.go
index ad59f887..651284b4 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -25,7 +25,7 @@ const (
// http middleware type.
type middleware func(f http.HandlerFunc) http.HandlerFunc
-// Service manages rr, http servers.
+// Services manages rr, http servers.
type Service struct {
cfg *Config
env env.Environment
@@ -33,11 +33,17 @@ type Service struct {
mdwr []middleware
mu sync.Mutex
rr *roadrunner.Server
+ watcher roadrunner.Watcher
handler *Handler
http *http.Server
https *http.Server
}
+// Watch attaches watcher.
+func (s *Service) Watch(w roadrunner.Watcher) {
+ s.watcher = w
+}
+
// AddMiddleware adds new net/http mdwr.
func (s *Service) AddMiddleware(m middleware) {
s.mdwr = append(s.mdwr, m)
@@ -53,6 +59,7 @@ func (s *Service) AddListener(l func(event int, ctx interface{})) {
func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment) (bool, error) {
s.cfg = cfg
s.env = e
+
if r != nil {
if err := r.Register(ID, &rpcServer{s}); err != nil {
return false, err
@@ -77,6 +84,10 @@ func (s *Service) Serve() error {
s.rr = roadrunner.NewServer(s.cfg.Workers)
s.rr.Listen(s.throw)
+ if s.watcher != nil {
+ s.rr.Watch(s.watcher)
+ }
+
s.handler = &Handler{cfg: s.cfg, rr: s.rr}
s.handler.Listen(s.throw)
@@ -102,7 +113,7 @@ func (s *Service) Serve() error {
return <-err
}
-// Stop stops the svc.
+// Detach stops the svc.
func (s *Service) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
@@ -117,6 +128,14 @@ func (s *Service) Stop() {
go s.http.Shutdown(context.Background())
}
+// Server returns associated roadrunner server (if any).
+func (s *Service) Server() *roadrunner.Server {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ return s.rr
+}
+
// ServeHTTP handles connection using set of middleware and rr PSR-7 server.
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect {
diff --git a/service/http/service_test.go b/service/http/service_test.go
index d1d601dc..5b6d60d8 100644
--- a/service/http/service_test.go
+++ b/service/http/service_test.go
@@ -84,7 +84,7 @@ func Test_Service_Configure_Enable(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":8070",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -115,7 +115,7 @@ func Test_Service_Echo(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -168,7 +168,7 @@ func Test_Service_Env(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -220,7 +220,7 @@ func Test_Service_ErrorEcho(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -280,7 +280,7 @@ func Test_Service_Middleware(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -354,7 +354,7 @@ func Test_Service_Listener(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -398,7 +398,7 @@ func Test_Service_Error(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -427,7 +427,7 @@ func Test_Service_Error2(t *testing.T) {
assert.NoError(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -456,7 +456,7 @@ func Test_Service_Error3(t *testing.T) {
assert.Error(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -483,7 +483,7 @@ func Test_Service_Error4(t *testing.T) {
assert.Error(t, c.Init(&testCfg{httpCfg: `{
"enable": true,
"address": "----",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go
index d452f834..0fbf0e14 100644
--- a/service/http/uploads_test.go
+++ b/service/http/uploads_test.go
@@ -20,7 +20,7 @@ import (
func TestHandler_Upload_File(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -81,7 +81,7 @@ func TestHandler_Upload_File(t *testing.T) {
func TestHandler_Upload_NestedFile(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{},
@@ -142,7 +142,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) {
func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: "-----",
Forbid: []string{},
@@ -203,7 +203,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
func TestHandler_Upload_File_Forbids(t *testing.T) {
h := &Handler{
cfg: &Config{
- MaxRequest: 1024,
+ MaxRequestSize: 1024,
Uploads: &UploadsConfig{
Dir: os.TempDir(),
Forbid: []string{".go"},
diff --git a/service/rpc/service.go b/service/rpc/service.go
index 0b957976..ea262615 100644
--- a/service/rpc/service.go
+++ b/service/rpc/service.go
@@ -3,6 +3,7 @@ package rpc
import (
"errors"
"github.com/spiral/goridge"
+ "github.com/spiral/roadrunner/service"
"github.com/spiral/roadrunner/service/env"
"net/rpc"
"sync"
@@ -11,7 +12,7 @@ import (
// ID contains default service name.
const ID = "rpc"
-// Service is RPC service.
+// Services is RPC service.
type Service struct {
cfg *Config
stop chan interface{}
@@ -21,7 +22,7 @@ type Service struct {
}
// Init rpc service. Must return true if service is enabled.
-func (s *Service) Init(cfg *Config, env env.Environment) (bool, error) {
+func (s *Service) Init(cfg *Config, c service.Container, env env.Environment) (bool, error) {
if !cfg.Enable {
return false, nil
}
@@ -33,6 +34,10 @@ func (s *Service) Init(cfg *Config, env env.Environment) (bool, error) {
env.SetEnv("RR_RPC", cfg.Listen)
}
+ if err := s.Register("system", &systemService{c}); err != nil {
+ return false, err
+ }
+
return true, nil
}
@@ -78,7 +83,7 @@ func (s *Service) Serve() error {
return nil
}
-// Stop stops the service.
+// Detach stops the service.
func (s *Service) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go
index 0278d287..ee87509a 100644
--- a/service/rpc/service_test.go
+++ b/service/rpc/service_test.go
@@ -1,6 +1,7 @@
package rpc
import (
+ "github.com/spiral/roadrunner/service"
"github.com/spiral/roadrunner/service/env"
"github.com/stretchr/testify/assert"
"testing"
@@ -13,7 +14,7 @@ func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil
func Test_Disabled(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&Config{Enable: false}, nil)
+ ok, err := s.Init(&Config{Enable: false}, service.NewContainer(nil), nil)
assert.NoError(t, err)
assert.False(t, ok)
@@ -31,7 +32,7 @@ func Test_RegisterNotConfigured(t *testing.T) {
func Test_Enabled(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil)
assert.NoError(t, err)
assert.True(t, ok)
@@ -39,7 +40,7 @@ func Test_Enabled(t *testing.T) {
func Test_StopNonServing(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil)
assert.NoError(t, err)
assert.True(t, ok)
@@ -48,7 +49,7 @@ func Test_StopNonServing(t *testing.T) {
func Test_Serve_Errors(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"}, service.NewContainer(nil), nil)
assert.NoError(t, err)
assert.True(t, ok)
@@ -61,7 +62,7 @@ func Test_Serve_Errors(t *testing.T) {
func Test_Serve_Client(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), nil)
assert.NoError(t, err)
assert.True(t, ok)
@@ -85,7 +86,7 @@ func Test_Serve_Client(t *testing.T) {
func TestSetEnv(t *testing.T) {
s := &Service{}
e := env.NewService(map[string]string{})
- ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, e)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), e)
assert.NoError(t, err)
assert.True(t, ok)
diff --git a/service/rpc/system.go b/service/rpc/system.go
new file mode 100644
index 00000000..d1368a05
--- /dev/null
+++ b/service/rpc/system.go
@@ -0,0 +1,18 @@
+package rpc
+
+import "github.com/spiral/roadrunner/service"
+
+// systemService service controls roadrunner server.
+type systemService struct {
+ c service.Container
+}
+
+// Detach the underlying c.
+func (s *systemService) Stop(stop bool, r *string) error {
+ if stop {
+ s.c.Stop()
+ }
+ *r = "OK"
+
+ return nil
+}
diff --git a/service/static/service.go b/service/static/service.go
index b824e787..679033f2 100644
--- a/service/static/service.go
+++ b/service/static/service.go
@@ -9,7 +9,7 @@ import (
// ID contains default service name.
const ID = "static"
-// Service serves static files. Potentially convert into middleware?
+// Services serves static files. Potentially convert into middleware?
type Service struct {
// server configuration (location, forbidden files and etc)
cfg *Config
diff --git a/service/static/service_test.go b/service/static/service_test.go
index af616418..d69b2fdd 100644
--- a/service/static/service_test.go
+++ b/service/static/service_test.go
@@ -60,7 +60,7 @@ func Test_Files(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -113,7 +113,7 @@ func Test_Files_Disable(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -150,7 +150,7 @@ func Test_Files_Error(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -180,7 +180,7 @@ func Test_Files_Error2(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -210,7 +210,7 @@ func Test_Files_Forbid(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -247,7 +247,7 @@ func Test_Files_Always(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -284,7 +284,7 @@ func Test_Files_NotFound(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -321,7 +321,7 @@ func Test_Files_Dir(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
@@ -358,7 +358,7 @@ func Test_Files_NotForbid(t *testing.T) {
httpCfg: `{
"enable": true,
"address": ":6029",
- "maxRequest": 1024,
+ "maxRequestSize": 1024,
"uploads": {
"dir": ` + tmpDir() + `,
"forbid": []
diff --git a/service/watcher/config.go b/service/watcher/config.go
new file mode 100644
index 00000000..74be517a
--- /dev/null
+++ b/service/watcher/config.go
@@ -0,0 +1,48 @@
+package watcher
+
+import (
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+ "time"
+)
+
+// Configures set of Services.
+type Config struct {
+ // Interval defines the update duration for underlying watchers, default 1s.
+ Interval time.Duration
+
+ // Services declares list of services to be watched.
+ Services map[string]*watcherConfig
+}
+
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ // Always use second based definition for time durations
+ if c.Interval < time.Microsecond {
+ c.Interval = time.Second * time.Duration(c.Interval.Nanoseconds())
+ }
+
+ return nil
+}
+
+// InitDefaults sets missing values to their default values.
+func (c *Config) InitDefaults() error {
+ c.Interval = time.Second
+
+ return nil
+}
+
+// Watchers returns list of defined Services
+func (c *Config) Watchers(l listener) (watchers map[string]roadrunner.Watcher) {
+ watchers = make(map[string]roadrunner.Watcher)
+
+ for name, cfg := range c.Services {
+ watchers[name] = &watcher{lsn: l, tick: c.Interval, cfg: cfg}
+ }
+
+ return watchers
+}
diff --git a/service/watcher/service.go b/service/watcher/service.go
new file mode 100644
index 00000000..c81ff3f5
--- /dev/null
+++ b/service/watcher/service.go
@@ -0,0 +1,46 @@
+package watcher
+
+import (
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
+)
+
+// ID defines watcher service name.
+const ID = "watch"
+
+// Watchable defines the ability to attach roadrunner watcher.
+type Watchable interface {
+ // Watch attaches watcher to the service.
+ Watch(w roadrunner.Watcher)
+}
+
+// Services to watch the state of roadrunner service inside other services.
+type Service struct {
+ cfg *Config
+ lsns []func(event int, ctx interface{})
+}
+
+// Init watcher service
+func (s *Service) Init(cfg *Config, c service.Container) (bool, error) {
+ // mount Services to designated services
+ for id, watcher := range cfg.Watchers(s.throw) {
+ svc, _ := c.Get(id)
+ if watchable, ok := svc.(Watchable); ok {
+ watchable.Watch(watcher)
+ }
+ }
+
+ return true, nil
+}
+
+// AddListener attaches server event watcher.
+func (s *Service) AddListener(l func(event int, ctx interface{})) {
+ s.lsns = append(s.lsns, l)
+}
+
+// throw handles service, server and pool events.
+func (s *Service) throw(event int, ctx interface{}) {
+ for _, l := range s.lsns {
+ l(event, ctx)
+ }
+}
diff --git a/service/watcher/state_watch.go b/service/watcher/state_watch.go
new file mode 100644
index 00000000..3090d15d
--- /dev/null
+++ b/service/watcher/state_watch.go
@@ -0,0 +1,58 @@
+package watcher
+
+import (
+ "github.com/spiral/roadrunner"
+ "time"
+)
+
+type stateWatcher struct {
+ prev map[*roadrunner.Worker]state
+ next map[*roadrunner.Worker]state
+}
+
+type state struct {
+ state int64
+ numExecs int64
+ since time.Time
+}
+
+func newStateWatcher() *stateWatcher {
+ return &stateWatcher{
+ prev: make(map[*roadrunner.Worker]state),
+ next: make(map[*roadrunner.Worker]state),
+ }
+}
+
+// add new worker to be watched
+func (sw *stateWatcher) push(w *roadrunner.Worker) {
+ sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()}
+}
+
+// update worker states.
+func (sw *stateWatcher) sync(t time.Time) {
+ for w := range sw.prev {
+ if _, ok := sw.next[w]; !ok {
+ delete(sw.prev, w)
+ }
+ }
+
+ for w, s := range sw.next {
+ ps, ok := sw.prev[w]
+ if !ok || ps.state != s.state || ps.numExecs != s.numExecs {
+ sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t}
+ }
+
+ delete(sw.next, w)
+ }
+}
+
+// find all workers which spend given amount of time in a specific state.
+func (sw *stateWatcher) find(state int64, since time.Time) (workers []*roadrunner.Worker) {
+ for w, s := range sw.prev {
+ if s.state == state && s.since.Before(since) {
+ workers = append(workers, w)
+ }
+ }
+
+ return
+}
diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go
new file mode 100644
index 00000000..08d477fa
--- /dev/null
+++ b/service/watcher/watcher.go
@@ -0,0 +1,153 @@
+package watcher
+
+import (
+ "fmt"
+ "github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/util"
+ "time"
+)
+
+const (
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory = iota + 8000
+
+ // EventMaxTTL thrown when worker is removed due TTL being reached. Context is roadrunner.WorkerError
+ EventMaxTTL
+
+ // EventMaxIdleTTL triggered when worker spends too much time at rest.
+ EventMaxIdleTTL
+
+ // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventMaxExecTTL
+)
+
+// handles watcher events
+type listener func(event int, ctx interface{})
+
+// defines the watcher behaviour
+type watcherConfig struct {
+ // MaxMemory defines maximum amount of memory allowed for worker. In megabytes.
+ MaxMemory uint64
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL int64
+
+ // MaxIdleTTL defines maximum duration worker can spend in idle mode.
+ MaxIdleTTL int64
+
+ // MaxExecTTL defines maximum lifetime per job.
+ MaxExecTTL int64
+}
+
+type watcher struct {
+ lsn listener
+ tick time.Duration
+ cfg *watcherConfig
+
+ // list of workers which are currently working
+ sw *stateWatcher
+
+ stop chan interface{}
+}
+
+// watch the pool state
+func (wch *watcher) watch(p roadrunner.Pool) {
+ now := time.Now()
+
+ for _, w := range p.Workers() {
+ if w.State().Value() == roadrunner.StateInvalid {
+ // skip duplicate assessment
+ continue
+ }
+
+ s, err := util.WorkerState(w)
+ if err != nil {
+ continue
+ }
+
+ if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) {
+ err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL)
+ if p.Remove(w, err) {
+ wch.report(EventMaxTTL, w, err)
+ }
+ continue
+ }
+
+ if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 {
+ err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory)
+ if p.Remove(w, err) {
+ wch.report(EventMaxMemory, w, err)
+ }
+ continue
+ }
+
+ // watch the worker state changes
+ wch.sw.push(w)
+ }
+
+ wch.sw.sync(now)
+
+ if wch.cfg.MaxExecTTL != 0 {
+ for _, w := range wch.sw.find(
+ roadrunner.StateWorking,
+ now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)),
+ ) {
+ err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL)
+ if p.Remove(w, err) {
+ // brutally
+ go w.Kill()
+ wch.report(EventMaxExecTTL, w, err)
+ }
+ }
+ }
+
+ // locale workers which are in idle mode for too long
+ if wch.cfg.MaxIdleTTL != 0 {
+ for _, w := range wch.sw.find(
+ roadrunner.StateReady,
+ now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)),
+ ) {
+ err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL)
+ if p.Remove(w, err) {
+ wch.report(EventMaxIdleTTL, w, err)
+ }
+ }
+ }
+}
+
+// throw watcher event
+func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) {
+ if wch.lsn != nil {
+ wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused})
+ }
+}
+
+// Attach watcher to the pool
+func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher {
+ wp := &watcher{
+ tick: wch.tick,
+ lsn: wch.lsn,
+ cfg: wch.cfg,
+ sw: newStateWatcher(),
+ stop: make(chan interface{}),
+ }
+
+ go func(wp *watcher, pool roadrunner.Pool) {
+ ticker := time.NewTicker(wp.tick)
+ for {
+ select {
+ case <-ticker.C:
+ wp.watch(pool)
+ case <-wp.stop:
+ return
+ }
+ }
+ }(wp, pool)
+
+ return wp
+}
+
+// Detach watcher from the pool.
+func (wch *watcher) Detach() {
+ close(wch.stop)
+}