diff options
Diffstat (limited to 'service')
-rw-r--r-- | service/container.go | 6 | ||||
-rw-r--r-- | service/env/service.go | 2 | ||||
-rw-r--r-- | service/http/config.go | 4 | ||||
-rw-r--r-- | service/http/config_test.go | 30 | ||||
-rw-r--r-- | service/http/handler.go | 4 | ||||
-rw-r--r-- | service/http/handler_test.go | 50 | ||||
-rw-r--r-- | service/http/rpc.go | 4 | ||||
-rw-r--r-- | service/http/rpc_test.go | 6 | ||||
-rw-r--r-- | service/http/service.go | 23 | ||||
-rw-r--r-- | service/http/service_test.go | 20 | ||||
-rw-r--r-- | service/http/uploads_test.go | 8 | ||||
-rw-r--r-- | service/rpc/service.go | 11 | ||||
-rw-r--r-- | service/rpc/service_test.go | 13 | ||||
-rw-r--r-- | service/rpc/system.go | 18 | ||||
-rw-r--r-- | service/static/service.go | 2 | ||||
-rw-r--r-- | service/static/service_test.go | 18 | ||||
-rw-r--r-- | service/watcher/config.go | 48 | ||||
-rw-r--r-- | service/watcher/service.go | 46 | ||||
-rw-r--r-- | service/watcher/state_watch.go | 58 | ||||
-rw-r--r-- | service/watcher/watcher.go | 153 |
20 files changed, 436 insertions, 88 deletions
diff --git a/service/container.go b/service/container.go index 275cfffd..abeaf369 100644 --- a/service/container.go +++ b/service/container.go @@ -16,13 +16,13 @@ var errNoConfig = fmt.Errorf("no config has been provided") // implement service.HydrateConfig. const InitMethod = "Init" -// Service can serve. Service can provide Init method which must return (bool, error) signature and might accept +// Services can serve. Services can provide Init method which must return (bool, error) signature and might accept // other services and/or configs as dependency. type Service interface { // Serve serves. Serve() error - // Stop stops the service. + // Detach stops the service. Stop() } @@ -198,7 +198,7 @@ func (c *container) Serve() error { return nil } -// Stop sends stop command to all running services. +// Detach sends stop command to all running services. func (c *container) Stop() { for _, e := range c.services { if e.hasStatus(StatusServing) { diff --git a/service/env/service.go b/service/env/service.go index 83175b36..00bc41ef 100644 --- a/service/env/service.go +++ b/service/env/service.go @@ -3,7 +3,7 @@ package env // ID contains default service name. const ID = "env" -// Service provides ability to map _ENV values from config file. +// Services provides ability to map _ENV values from config file. type Service struct { // values is default set of values. values map[string]string diff --git a/service/http/config.go b/service/http/config.go index 5a2c8768..899a5083 100644 --- a/service/http/config.go +++ b/service/http/config.go @@ -17,8 +17,8 @@ type Config struct { // SSL defines https server options. SSL SSLConfig - // MaxRequest specified max size for payload body in megabytes, set 0 to unlimited. - MaxRequest int64 + // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited. + MaxRequestSize int64 // Uploads configures uploads configuration. Uploads *UploadsConfig diff --git a/service/http/config_test.go b/service/http/config_test.go index 07901cb6..4cd2783f 100644 --- a/service/http/config_test.go +++ b/service/http/config_test.go @@ -31,8 +31,8 @@ func Test_Config_Hydrate_Error2(t *testing.T) { func Test_Config_Valid(t *testing.T) { cfg := &Config{ - Address: ":8080", - MaxRequest: 1024, + Address: ":8080", + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -58,7 +58,7 @@ func Test_Config_Valid_SSL(t *testing.T) { Cert: "fixtures/server.crt", Key: "fixtures/server.key", }, - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -87,7 +87,7 @@ func Test_Config_SSL_No_key(t *testing.T) { SSL: SSLConfig{ Cert: "fixtures/server.crt", }, - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -112,7 +112,7 @@ func Test_Config_SSL_No_Cert(t *testing.T) { SSL: SSLConfig{ Key: "fixtures/server.key", }, - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -133,8 +133,8 @@ func Test_Config_SSL_No_Cert(t *testing.T) { func Test_Config_NoUploads(t *testing.T) { cfg := &Config{ - Address: ":8080", - MaxRequest: 1024, + Address: ":8080", + MaxRequestSize: 1024, Workers: &roadrunner.ServerConfig{ Command: "php tests/client.php echo pipes", Relay: "pipes", @@ -151,8 +151,8 @@ func Test_Config_NoUploads(t *testing.T) { func Test_Config_NoWorkers(t *testing.T) { cfg := &Config{ - Address: ":8080", - MaxRequest: 1024, + Address: ":8080", + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -164,8 +164,8 @@ func Test_Config_NoWorkers(t *testing.T) { func Test_Config_NoPool(t *testing.T) { cfg := &Config{ - Address: ":8080", - MaxRequest: 1024, + Address: ":8080", + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -186,8 +186,8 @@ func Test_Config_NoPool(t *testing.T) { func Test_Config_DeadPool(t *testing.T) { cfg := &Config{ - Address: ":8080", - MaxRequest: 1024, + Address: ":8080", + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, @@ -203,8 +203,8 @@ func Test_Config_DeadPool(t *testing.T) { func Test_Config_InvalidAddress(t *testing.T) { cfg := &Config{ - Address: "", - MaxRequest: 1024, + Address: "", + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, diff --git a/service/http/handler.go b/service/http/handler.go index 8cebc42a..a7a6d4d0 100644 --- a/service/http/handler.go +++ b/service/http/handler.go @@ -75,12 +75,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { start := time.Now() // validating request size - if h.cfg.MaxRequest != 0 { + if h.cfg.MaxRequestSize != 0 { if length := r.Header.Get("content-length"); length != "" { if size, err := strconv.ParseInt(length, 10, 64); err != nil { h.handleError(w, r, err, start) return - } else if size > h.cfg.MaxRequest*1024*1024 { + } else if size > h.cfg.MaxRequestSize*1024*1024 { h.handleError(w, r, errors.New("request body max size is exceeded"), start) return } diff --git a/service/http/handler_test.go b/service/http/handler_test.go index d876ef8e..5d4f7659 100644 --- a/service/http/handler_test.go +++ b/service/http/handler_test.go @@ -32,7 +32,7 @@ func get(url string) (string, *http.Response, error) { func TestHandler_Echo(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -67,7 +67,7 @@ func TestHandler_Echo(t *testing.T) { func Test_HandlerErrors(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -94,7 +94,7 @@ func Test_HandlerErrors(t *testing.T) { func Test_Handler_JSON_error(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -123,7 +123,7 @@ func Test_Handler_JSON_error(t *testing.T) { func TestHandler_Headers(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -170,7 +170,7 @@ func TestHandler_Headers(t *testing.T) { func TestHandler_Empty_User_Agent(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -216,7 +216,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) { func TestHandler_User_Agent(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -262,7 +262,7 @@ func TestHandler_User_Agent(t *testing.T) { func TestHandler_Cookies(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -313,7 +313,7 @@ func TestHandler_Cookies(t *testing.T) { func TestHandler_JsonPayload_POST(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -363,7 +363,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) { func TestHandler_JsonPayload_PUT(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -409,7 +409,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) { func TestHandler_JsonPayload_PATCH(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -455,7 +455,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) { func TestHandler_FormData_POST(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -513,7 +513,7 @@ func TestHandler_FormData_POST(t *testing.T) { func TestHandler_FormData_POST_Overwrite(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -565,7 +565,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) { func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -623,7 +623,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) { func TestHandler_FormData_PUT(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -681,7 +681,7 @@ func TestHandler_FormData_PUT(t *testing.T) { func TestHandler_FormData_PATCH(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -739,7 +739,7 @@ func TestHandler_FormData_PATCH(t *testing.T) { func TestHandler_Multipart_POST(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -801,7 +801,7 @@ func TestHandler_Multipart_POST(t *testing.T) { func TestHandler_Multipart_PUT(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -863,7 +863,7 @@ func TestHandler_Multipart_PUT(t *testing.T) { func TestHandler_Multipart_PATCH(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -925,7 +925,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) { func TestHandler_Error(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -959,7 +959,7 @@ func TestHandler_Error(t *testing.T) { func TestHandler_Error2(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -993,7 +993,7 @@ func TestHandler_Error2(t *testing.T) { func TestHandler_Error3(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1, + MaxRequestSize: 1, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -1038,7 +1038,7 @@ func TestHandler_Error3(t *testing.T) { func TestHandler_ResponseDuration(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -1087,7 +1087,7 @@ func TestHandler_ResponseDuration(t *testing.T) { func TestHandler_ResponseDurationDelayed(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -1136,7 +1136,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) { func TestHandler_ErrorDuration(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -1184,7 +1184,7 @@ func TestHandler_ErrorDuration(t *testing.T) { func BenchmarkHandler_Listen_Echo(b *testing.B) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, diff --git a/service/http/rpc.go b/service/http/rpc.go index 3390a93d..7b38dece 100644 --- a/service/http/rpc.go +++ b/service/http/rpc.go @@ -20,7 +20,7 @@ func (rpc *rpcServer) Reset(reset bool, r *string) error { } *r = "OK" - return rpc.svc.rr.Reset() + return rpc.svc.Server().Reset() } // Workers returns list of active workers and their stats. @@ -29,6 +29,6 @@ func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) { return errors.New("http server is not running") } - r.Workers, err = util.ServerState(rpc.svc.rr) + r.Workers, err = util.ServerState(rpc.svc.Server()) return err } diff --git a/service/http/rpc_test.go b/service/http/rpc_test.go index ba3efd2e..669b201c 100644 --- a/service/http/rpc_test.go +++ b/service/http/rpc_test.go @@ -27,7 +27,7 @@ func Test_RPC(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -88,7 +88,7 @@ func Test_RPC_Unix(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -142,7 +142,7 @@ func Test_Workers(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] diff --git a/service/http/service.go b/service/http/service.go index ad59f887..651284b4 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -25,7 +25,7 @@ const ( // http middleware type. type middleware func(f http.HandlerFunc) http.HandlerFunc -// Service manages rr, http servers. +// Services manages rr, http servers. type Service struct { cfg *Config env env.Environment @@ -33,11 +33,17 @@ type Service struct { mdwr []middleware mu sync.Mutex rr *roadrunner.Server + watcher roadrunner.Watcher handler *Handler http *http.Server https *http.Server } +// Watch attaches watcher. +func (s *Service) Watch(w roadrunner.Watcher) { + s.watcher = w +} + // AddMiddleware adds new net/http mdwr. func (s *Service) AddMiddleware(m middleware) { s.mdwr = append(s.mdwr, m) @@ -53,6 +59,7 @@ func (s *Service) AddListener(l func(event int, ctx interface{})) { func (s *Service) Init(cfg *Config, r *rpc.Service, e env.Environment) (bool, error) { s.cfg = cfg s.env = e + if r != nil { if err := r.Register(ID, &rpcServer{s}); err != nil { return false, err @@ -77,6 +84,10 @@ func (s *Service) Serve() error { s.rr = roadrunner.NewServer(s.cfg.Workers) s.rr.Listen(s.throw) + if s.watcher != nil { + s.rr.Watch(s.watcher) + } + s.handler = &Handler{cfg: s.cfg, rr: s.rr} s.handler.Listen(s.throw) @@ -102,7 +113,7 @@ func (s *Service) Serve() error { return <-err } -// Stop stops the svc. +// Detach stops the svc. func (s *Service) Stop() { s.mu.Lock() defer s.mu.Unlock() @@ -117,6 +128,14 @@ func (s *Service) Stop() { go s.http.Shutdown(context.Background()) } +// Server returns associated roadrunner server (if any). +func (s *Service) Server() *roadrunner.Server { + s.mu.Lock() + defer s.mu.Unlock() + + return s.rr +} + // ServeHTTP handles connection using set of middleware and rr PSR-7 server. func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect { diff --git a/service/http/service_test.go b/service/http/service_test.go index d1d601dc..5b6d60d8 100644 --- a/service/http/service_test.go +++ b/service/http/service_test.go @@ -84,7 +84,7 @@ func Test_Service_Configure_Enable(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":8070", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -115,7 +115,7 @@ func Test_Service_Echo(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -168,7 +168,7 @@ func Test_Service_Env(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -220,7 +220,7 @@ func Test_Service_ErrorEcho(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -280,7 +280,7 @@ func Test_Service_Middleware(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -354,7 +354,7 @@ func Test_Service_Listener(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -398,7 +398,7 @@ func Test_Service_Error(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -427,7 +427,7 @@ func Test_Service_Error2(t *testing.T) { assert.NoError(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -456,7 +456,7 @@ func Test_Service_Error3(t *testing.T) { assert.Error(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -483,7 +483,7 @@ func Test_Service_Error4(t *testing.T) { assert.Error(t, c.Init(&testCfg{httpCfg: `{ "enable": true, "address": "----", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] diff --git a/service/http/uploads_test.go b/service/http/uploads_test.go index d452f834..0fbf0e14 100644 --- a/service/http/uploads_test.go +++ b/service/http/uploads_test.go @@ -20,7 +20,7 @@ import ( func TestHandler_Upload_File(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -81,7 +81,7 @@ func TestHandler_Upload_File(t *testing.T) { func TestHandler_Upload_NestedFile(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{}, @@ -142,7 +142,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) { func TestHandler_Upload_File_NoTmpDir(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: "-----", Forbid: []string{}, @@ -203,7 +203,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) { func TestHandler_Upload_File_Forbids(t *testing.T) { h := &Handler{ cfg: &Config{ - MaxRequest: 1024, + MaxRequestSize: 1024, Uploads: &UploadsConfig{ Dir: os.TempDir(), Forbid: []string{".go"}, diff --git a/service/rpc/service.go b/service/rpc/service.go index 0b957976..ea262615 100644 --- a/service/rpc/service.go +++ b/service/rpc/service.go @@ -3,6 +3,7 @@ package rpc import ( "errors" "github.com/spiral/goridge" + "github.com/spiral/roadrunner/service" "github.com/spiral/roadrunner/service/env" "net/rpc" "sync" @@ -11,7 +12,7 @@ import ( // ID contains default service name. const ID = "rpc" -// Service is RPC service. +// Services is RPC service. type Service struct { cfg *Config stop chan interface{} @@ -21,7 +22,7 @@ type Service struct { } // Init rpc service. Must return true if service is enabled. -func (s *Service) Init(cfg *Config, env env.Environment) (bool, error) { +func (s *Service) Init(cfg *Config, c service.Container, env env.Environment) (bool, error) { if !cfg.Enable { return false, nil } @@ -33,6 +34,10 @@ func (s *Service) Init(cfg *Config, env env.Environment) (bool, error) { env.SetEnv("RR_RPC", cfg.Listen) } + if err := s.Register("system", &systemService{c}); err != nil { + return false, err + } + return true, nil } @@ -78,7 +83,7 @@ func (s *Service) Serve() error { return nil } -// Stop stops the service. +// Detach stops the service. func (s *Service) Stop() { s.mu.Lock() defer s.mu.Unlock() diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go index 0278d287..ee87509a 100644 --- a/service/rpc/service_test.go +++ b/service/rpc/service_test.go @@ -1,6 +1,7 @@ package rpc import ( + "github.com/spiral/roadrunner/service" "github.com/spiral/roadrunner/service/env" "github.com/stretchr/testify/assert" "testing" @@ -13,7 +14,7 @@ func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil func Test_Disabled(t *testing.T) { s := &Service{} - ok, err := s.Init(&Config{Enable: false}, nil) + ok, err := s.Init(&Config{Enable: false}, service.NewContainer(nil), nil) assert.NoError(t, err) assert.False(t, ok) @@ -31,7 +32,7 @@ func Test_RegisterNotConfigured(t *testing.T) { func Test_Enabled(t *testing.T) { s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil) assert.NoError(t, err) assert.True(t, ok) @@ -39,7 +40,7 @@ func Test_Enabled(t *testing.T) { func Test_StopNonServing(t *testing.T) { s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}, service.NewContainer(nil), nil) assert.NoError(t, err) assert.True(t, ok) @@ -48,7 +49,7 @@ func Test_StopNonServing(t *testing.T) { func Test_Serve_Errors(t *testing.T) { s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"}, service.NewContainer(nil), nil) assert.NoError(t, err) assert.True(t, ok) @@ -61,7 +62,7 @@ func Test_Serve_Errors(t *testing.T) { func Test_Serve_Client(t *testing.T) { s := &Service{} - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), nil) assert.NoError(t, err) assert.True(t, ok) @@ -85,7 +86,7 @@ func Test_Serve_Client(t *testing.T) { func TestSetEnv(t *testing.T) { s := &Service{} e := env.NewService(map[string]string{}) - ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, e) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}, service.NewContainer(nil), e) assert.NoError(t, err) assert.True(t, ok) diff --git a/service/rpc/system.go b/service/rpc/system.go new file mode 100644 index 00000000..d1368a05 --- /dev/null +++ b/service/rpc/system.go @@ -0,0 +1,18 @@ +package rpc + +import "github.com/spiral/roadrunner/service" + +// systemService service controls roadrunner server. +type systemService struct { + c service.Container +} + +// Detach the underlying c. +func (s *systemService) Stop(stop bool, r *string) error { + if stop { + s.c.Stop() + } + *r = "OK" + + return nil +} diff --git a/service/static/service.go b/service/static/service.go index b824e787..679033f2 100644 --- a/service/static/service.go +++ b/service/static/service.go @@ -9,7 +9,7 @@ import ( // ID contains default service name. const ID = "static" -// Service serves static files. Potentially convert into middleware? +// Services serves static files. Potentially convert into middleware? type Service struct { // server configuration (location, forbidden files and etc) cfg *Config diff --git a/service/static/service_test.go b/service/static/service_test.go index af616418..d69b2fdd 100644 --- a/service/static/service_test.go +++ b/service/static/service_test.go @@ -60,7 +60,7 @@ func Test_Files(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -113,7 +113,7 @@ func Test_Files_Disable(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -150,7 +150,7 @@ func Test_Files_Error(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -180,7 +180,7 @@ func Test_Files_Error2(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -210,7 +210,7 @@ func Test_Files_Forbid(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -247,7 +247,7 @@ func Test_Files_Always(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -284,7 +284,7 @@ func Test_Files_NotFound(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -321,7 +321,7 @@ func Test_Files_Dir(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] @@ -358,7 +358,7 @@ func Test_Files_NotForbid(t *testing.T) { httpCfg: `{ "enable": true, "address": ":6029", - "maxRequest": 1024, + "maxRequestSize": 1024, "uploads": { "dir": ` + tmpDir() + `, "forbid": [] diff --git a/service/watcher/config.go b/service/watcher/config.go new file mode 100644 index 00000000..74be517a --- /dev/null +++ b/service/watcher/config.go @@ -0,0 +1,48 @@ +package watcher + +import ( + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" + "time" +) + +// Configures set of Services. +type Config struct { + // Interval defines the update duration for underlying watchers, default 1s. + Interval time.Duration + + // Services declares list of services to be watched. + Services map[string]*watcherConfig +} + +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + // Always use second based definition for time durations + if c.Interval < time.Microsecond { + c.Interval = time.Second * time.Duration(c.Interval.Nanoseconds()) + } + + return nil +} + +// InitDefaults sets missing values to their default values. +func (c *Config) InitDefaults() error { + c.Interval = time.Second + + return nil +} + +// Watchers returns list of defined Services +func (c *Config) Watchers(l listener) (watchers map[string]roadrunner.Watcher) { + watchers = make(map[string]roadrunner.Watcher) + + for name, cfg := range c.Services { + watchers[name] = &watcher{lsn: l, tick: c.Interval, cfg: cfg} + } + + return watchers +} diff --git a/service/watcher/service.go b/service/watcher/service.go new file mode 100644 index 00000000..c81ff3f5 --- /dev/null +++ b/service/watcher/service.go @@ -0,0 +1,46 @@ +package watcher + +import ( + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" +) + +// ID defines watcher service name. +const ID = "watch" + +// Watchable defines the ability to attach roadrunner watcher. +type Watchable interface { + // Watch attaches watcher to the service. + Watch(w roadrunner.Watcher) +} + +// Services to watch the state of roadrunner service inside other services. +type Service struct { + cfg *Config + lsns []func(event int, ctx interface{}) +} + +// Init watcher service +func (s *Service) Init(cfg *Config, c service.Container) (bool, error) { + // mount Services to designated services + for id, watcher := range cfg.Watchers(s.throw) { + svc, _ := c.Get(id) + if watchable, ok := svc.(Watchable); ok { + watchable.Watch(watcher) + } + } + + return true, nil +} + +// AddListener attaches server event watcher. +func (s *Service) AddListener(l func(event int, ctx interface{})) { + s.lsns = append(s.lsns, l) +} + +// throw handles service, server and pool events. +func (s *Service) throw(event int, ctx interface{}) { + for _, l := range s.lsns { + l(event, ctx) + } +} diff --git a/service/watcher/state_watch.go b/service/watcher/state_watch.go new file mode 100644 index 00000000..3090d15d --- /dev/null +++ b/service/watcher/state_watch.go @@ -0,0 +1,58 @@ +package watcher + +import ( + "github.com/spiral/roadrunner" + "time" +) + +type stateWatcher struct { + prev map[*roadrunner.Worker]state + next map[*roadrunner.Worker]state +} + +type state struct { + state int64 + numExecs int64 + since time.Time +} + +func newStateWatcher() *stateWatcher { + return &stateWatcher{ + prev: make(map[*roadrunner.Worker]state), + next: make(map[*roadrunner.Worker]state), + } +} + +// add new worker to be watched +func (sw *stateWatcher) push(w *roadrunner.Worker) { + sw.next[w] = state{state: w.State().Value(), numExecs: w.State().NumExecs()} +} + +// update worker states. +func (sw *stateWatcher) sync(t time.Time) { + for w := range sw.prev { + if _, ok := sw.next[w]; !ok { + delete(sw.prev, w) + } + } + + for w, s := range sw.next { + ps, ok := sw.prev[w] + if !ok || ps.state != s.state || ps.numExecs != s.numExecs { + sw.prev[w] = state{state: s.state, numExecs: s.numExecs, since: t} + } + + delete(sw.next, w) + } +} + +// find all workers which spend given amount of time in a specific state. +func (sw *stateWatcher) find(state int64, since time.Time) (workers []*roadrunner.Worker) { + for w, s := range sw.prev { + if s.state == state && s.since.Before(since) { + workers = append(workers, w) + } + } + + return +} diff --git a/service/watcher/watcher.go b/service/watcher/watcher.go new file mode 100644 index 00000000..08d477fa --- /dev/null +++ b/service/watcher/watcher.go @@ -0,0 +1,153 @@ +package watcher + +import ( + "fmt" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/util" + "time" +) + +const ( + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory = iota + 8000 + + // EventMaxTTL thrown when worker is removed due TTL being reached. Context is roadrunner.WorkerError + EventMaxTTL + + // EventMaxIdleTTL triggered when worker spends too much time at rest. + EventMaxIdleTTL + + // EventMaxIdleTTL triggered when worker spends too much time doing the task (max_execution_time). + EventMaxExecTTL +) + +// handles watcher events +type listener func(event int, ctx interface{}) + +// defines the watcher behaviour +type watcherConfig struct { + // MaxMemory defines maximum amount of memory allowed for worker. In megabytes. + MaxMemory uint64 + + // TTL defines maximum time worker is allowed to live. + TTL int64 + + // MaxIdleTTL defines maximum duration worker can spend in idle mode. + MaxIdleTTL int64 + + // MaxExecTTL defines maximum lifetime per job. + MaxExecTTL int64 +} + +type watcher struct { + lsn listener + tick time.Duration + cfg *watcherConfig + + // list of workers which are currently working + sw *stateWatcher + + stop chan interface{} +} + +// watch the pool state +func (wch *watcher) watch(p roadrunner.Pool) { + now := time.Now() + + for _, w := range p.Workers() { + if w.State().Value() == roadrunner.StateInvalid { + // skip duplicate assessment + continue + } + + s, err := util.WorkerState(w) + if err != nil { + continue + } + + if wch.cfg.TTL != 0 && now.Sub(w.Created).Seconds() >= float64(wch.cfg.TTL) { + err := fmt.Errorf("max TTL reached (%vs)", wch.cfg.TTL) + if p.Remove(w, err) { + wch.report(EventMaxTTL, w, err) + } + continue + } + + if wch.cfg.MaxMemory != 0 && s.MemoryUsage >= wch.cfg.MaxMemory*1024*1024 { + err := fmt.Errorf("max allowed memory reached (%vMB)", wch.cfg.MaxMemory) + if p.Remove(w, err) { + wch.report(EventMaxMemory, w, err) + } + continue + } + + // watch the worker state changes + wch.sw.push(w) + } + + wch.sw.sync(now) + + if wch.cfg.MaxExecTTL != 0 { + for _, w := range wch.sw.find( + roadrunner.StateWorking, + now.Add(-time.Second*time.Duration(wch.cfg.MaxExecTTL)), + ) { + err := fmt.Errorf("max exec time reached (%vs)", wch.cfg.MaxExecTTL) + if p.Remove(w, err) { + // brutally + go w.Kill() + wch.report(EventMaxExecTTL, w, err) + } + } + } + + // locale workers which are in idle mode for too long + if wch.cfg.MaxIdleTTL != 0 { + for _, w := range wch.sw.find( + roadrunner.StateReady, + now.Add(-time.Second*time.Duration(wch.cfg.MaxIdleTTL)), + ) { + err := fmt.Errorf("max idle time reached (%vs)", wch.cfg.MaxIdleTTL) + if p.Remove(w, err) { + wch.report(EventMaxIdleTTL, w, err) + } + } + } +} + +// throw watcher event +func (wch *watcher) report(event int, worker *roadrunner.Worker, caused error) { + if wch.lsn != nil { + wch.lsn(event, roadrunner.WorkerError{Worker: worker, Caused: caused}) + } +} + +// Attach watcher to the pool +func (wch *watcher) Attach(pool roadrunner.Pool) roadrunner.Watcher { + wp := &watcher{ + tick: wch.tick, + lsn: wch.lsn, + cfg: wch.cfg, + sw: newStateWatcher(), + stop: make(chan interface{}), + } + + go func(wp *watcher, pool roadrunner.Pool) { + ticker := time.NewTicker(wp.tick) + for { + select { + case <-ticker.C: + wp.watch(pool) + case <-wp.stop: + return + } + } + }(wp, pool) + + return wp +} + +// Detach watcher from the pool. +func (wch *watcher) Detach() { + close(wch.stop) +} |