From 3c3a7801100f29c99a5e446646c818bf16ccd5f0 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 10:23:30 -0700 Subject: minor attributes refactoring --- cmd/rr/cmd/root.go | 2 +- service/container.go | 6 +-- service/http/attributes.go | 69 ---------------------------- service/http/attributes/attributes.go | 74 ++++++++++++++++++++++++++++++ service/http/attributes/attributes_test.go | 67 +++++++++++++++++++++++++++ service/http/attributes_test.go | 67 --------------------------- service/http/request.go | 3 +- service/http/service.go | 6 ++- 8 files changed, 151 insertions(+), 143 deletions(-) delete mode 100644 service/http/attributes.go create mode 100644 service/http/attributes/attributes.go create mode 100644 service/http/attributes/attributes_test.go delete mode 100644 service/http/attributes_test.go diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go index 1a21cfc9..086f518c 100644 --- a/cmd/rr/cmd/root.go +++ b/cmd/rr/cmd/root.go @@ -58,7 +58,7 @@ type ViperWrapper struct { v *viper.Viper } -// Get nested config section (sub-map), returns nil if section not found. +// get nested config section (sub-map), returns nil if section not found. func (w *ViperWrapper) Get(key string) service.Config { sub := w.v.Sub(key) if sub == nil { diff --git a/service/container.go b/service/container.go index 0987b1ae..12c5a4a1 100644 --- a/service/container.go +++ b/service/container.go @@ -10,7 +10,7 @@ import ( // Config provides ability to slice configuration sections and unmarshal configuration data into // given structure. type Config interface { - // Get nested config section (sub-map), returns nil if section not found. + // get nested config section (sub-map), returns nil if section not found. Get(service string) Config // Unmarshal unmarshal config data into given struct. @@ -28,7 +28,7 @@ type Container interface { // Check if svc has been registered. Has(service string) bool - // Get returns svc instance by it's name or nil if svc not found. Method returns current service status + // get returns svc instance by it's name or nil if svc not found. Method returns current service status // as second value. Get(service string) (svc Service, status int) @@ -81,7 +81,7 @@ func (c *container) Has(target string) bool { return false } -// Get returns svc instance by it's name or nil if svc not found. +// get returns svc instance by it's name or nil if svc not found. func (c *container) Get(target string) (svc Service, status int) { c.mu.Lock() defer c.mu.Unlock() diff --git a/service/http/attributes.go b/service/http/attributes.go deleted file mode 100644 index acea38a1..00000000 --- a/service/http/attributes.go +++ /dev/null @@ -1,69 +0,0 @@ -package http - -import ( - "context" - "net/http" - "errors" -) - -const contextKey = "psr:attributes" - -type attrs map[string]interface{} - -// InitAttributes returns request with new context and attribute bag. -func InitAttributes(r *http.Request) *http.Request { - return r.WithContext(context.WithValue(r.Context(), contextKey, attrs{})) -} - -// AllAttributes returns all context attributes. -func AllAttributes(r *http.Request) map[string]interface{} { - v := r.Context().Value(contextKey) - if v == nil { - return attrs{} - } - - return v.(attrs) -} - -// Get gets the value from request context. It replaces any existing -// values. -func GetAttribute(r *http.Request, key string) interface{} { - v := r.Context().Value(contextKey) - if v == nil { - return nil - } - - return v.(attrs).Get(key) -} - -// Set sets the key to value. It replaces any existing -// values. Context specific. -func SetAttribute(r *http.Request, key string, value interface{}) error { - v := r.Context().Value(contextKey) - if v == nil { - return errors.New("unable to find psr:attributes context value") - } - - v.(attrs).Set(key, value) - return nil -} - -// Get gets the value associated with the given key. -func (v attrs) Get(key string) interface{} { - if v == nil { - return "" - } - - return v[key] -} - -// Set sets the key to value. It replaces any existing -// values. -func (v attrs) Set(key string, value interface{}) { - v[key] = value -} - -// Del deletes the value associated with key. -func (v attrs) Del(key string) { - delete(v, key) -} diff --git a/service/http/attributes/attributes.go b/service/http/attributes/attributes.go new file mode 100644 index 00000000..94d0e9c1 --- /dev/null +++ b/service/http/attributes/attributes.go @@ -0,0 +1,74 @@ +package attributes + +import ( + "context" + "errors" + "net/http" +) + +const contextKey = "psr:attributes" + +type attrs map[string]interface{} + +func (v attrs) get(key string) interface{} { + if v == nil { + return "" + } + + return v[key] +} + +func (v attrs) set(key string, value interface{}) { + v[key] = value +} + +func (v attrs) del(key string) { + delete(v, key) +} + +// Init returns request with new context and attribute bag. +func Init(r *http.Request) *http.Request { + return r.WithContext(context.WithValue(r.Context(), contextKey, attrs{})) +} + +// All returns all context attributes. +func All(r *http.Request) map[string]interface{} { + v := r.Context().Value(contextKey) + if v == nil { + return attrs{} + } + + return v.(attrs) +} + +// get gets the value from request context. It replaces any existing +// values. +func Get(r *http.Request, key string) interface{} { + v := r.Context().Value(contextKey) + if v == nil { + return nil + } + + return v.(attrs).get(key) +} + +// set sets the key to value. It replaces any existing +// values. Context specific. +func Set(r *http.Request, key string, value interface{}) error { + v := r.Context().Value(contextKey) + if v == nil { + return errors.New("unable to find `psr:attributes` context key") + } + + v.(attrs).set(key, value) + return nil +} + +// Delete deletes values associated with attribute key. +func (v attrs) Delete(key string) { + if v == nil { + return + } + + v.del(key) +} diff --git a/service/http/attributes/attributes_test.go b/service/http/attributes/attributes_test.go new file mode 100644 index 00000000..a71d6542 --- /dev/null +++ b/service/http/attributes/attributes_test.go @@ -0,0 +1,67 @@ +package attributes + +import ( + "github.com/stretchr/testify/assert" + "net/http" + "testing" +) + +func TestAllAttributes(t *testing.T) { + r := &http.Request{} + r = Init(r) + + Set(r, "key", "value") + + assert.Equal(t, All(r), map[string]interface{}{ + "key": "value", + }) +} + +func TestAllAttributesNone(t *testing.T) { + r := &http.Request{} + r = Init(r) + + assert.Equal(t, All(r), map[string]interface{}{}) +} + +func TestAllAttributesNone2(t *testing.T) { + r := &http.Request{} + + assert.Equal(t, All(r), map[string]interface{}{}) +} + +func TestGetAttribute(t *testing.T) { + r := &http.Request{} + r = Init(r) + + Set(r, "key", "value") + assert.Equal(t, Get(r, "key"), "value") +} + +func TestGetAttributeNone(t *testing.T) { + r := &http.Request{} + r = Init(r) + + assert.Equal(t, Get(r, "key"), nil) +} + +func TestGetAttributeNone2(t *testing.T) { + r := &http.Request{} + + assert.Equal(t, Get(r, "key"), nil) +} + +func TestSetAttribute(t *testing.T) { + r := &http.Request{} + r = Init(r) + + Set(r, "key", "value") + assert.Equal(t, Get(r, "key"), "value") +} + +func TestSetAttributeNone(t *testing.T) { + r := &http.Request{} + + Set(r, "key", "value") + assert.Equal(t, Get(r, "key"), nil) +} diff --git a/service/http/attributes_test.go b/service/http/attributes_test.go deleted file mode 100644 index aeb7fe74..00000000 --- a/service/http/attributes_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package http - -import ( - "testing" - "net/http" - "github.com/stretchr/testify/assert" -) - -func TestAllAttributes(t *testing.T) { - r := &http.Request{} - r = InitAttributes(r) - - SetAttribute(r, "key", "value") - - assert.Equal(t, AllAttributes(r), map[string]interface{}{ - "key": "value", - }) -} - -func TestAllAttributesNone(t *testing.T) { - r := &http.Request{} - r = InitAttributes(r) - - assert.Equal(t, AllAttributes(r), map[string]interface{}{}) -} - -func TestAllAttributesNone2(t *testing.T) { - r := &http.Request{} - - assert.Equal(t, AllAttributes(r), map[string]interface{}{}) -} - -func TestGetAttribute(t *testing.T) { - r := &http.Request{} - r = InitAttributes(r) - - SetAttribute(r, "key", "value") - assert.Equal(t, GetAttribute(r, "key"), "value") -} - -func TestGetAttributeNone(t *testing.T) { - r := &http.Request{} - r = InitAttributes(r) - - assert.Equal(t, GetAttribute(r, "key"), nil) -} - -func TestGetAttributeNone2(t *testing.T) { - r := &http.Request{} - - assert.Equal(t, GetAttribute(r, "key"), nil) -} - -func TestSetAttribute(t *testing.T) { - r := &http.Request{} - r = InitAttributes(r) - - SetAttribute(r, "key", "value") - assert.Equal(t, GetAttribute(r, "key"), "value") -} - -func TestSetAttributeNone(t *testing.T) { - r := &http.Request{} - - SetAttribute(r, "key", "value") - assert.Equal(t, GetAttribute(r, "key"), nil) -} \ No newline at end of file diff --git a/service/http/request.go b/service/http/request.go index 21566416..912843e9 100644 --- a/service/http/request.go +++ b/service/http/request.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "strings" + "github.com/spiral/roadrunner/service/http/attributes" ) const ( @@ -60,7 +61,7 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { Headers: r.Header, Cookies: make(map[string]string), RawQuery: r.URL.RawQuery, - Attributes: AllAttributes(r), + Attributes: attributes.All(r), } for _, c := range r.Cookies() { diff --git a/service/http/service.go b/service/http/service.go index 710cd60c..7405bf37 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -8,6 +8,7 @@ import ( "net/http" "sync" "sync/atomic" + "github.com/spiral/roadrunner/service/http/attributes" ) // ID contains default svc name. @@ -113,16 +114,17 @@ func (s *Service) Stop() { // middleware handles connection using set of mdws and rr PSR-7 server. func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { - r = InitAttributes(r) + r = attributes.Init(r) + // chaining middlewares f := s.srv.ServeHTTP for _, m := range s.mdws { f = m(f) } - f(w, r) } +// listener handles service, server and pool events. func (s *Service) listener(event int, ctx interface{}) { for _, l := range s.lsns { l(event, ctx) -- cgit v1.2.3 From 29c9bf94350e86ec96f5ce5eeb476dfcd57302cd Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 13:06:05 -0700 Subject: dependency injection and lighter service Init methods. --- cmd/rr/http/reset.go | 2 +- cmd/rr/http/workers.go | 2 +- service/container.go | 79 ++++++++++++++++++++---------- service/container_test.go | 4 +- service/entry.go | 56 ++++++++++++++++++++++ service/http/config.go | 22 ++++++--- service/http/service.go | 25 ++-------- service/http/service_test.go | 14 +++--- service/injector.go | 112 +++++++++++++++++++++++++++++++++++++++++++ service/injector_test.go | 24 ++++++++++ service/rpc/config.go | 38 ++++++++++++--- service/rpc/config_test.go | 38 ++++++++------- service/rpc/service.go | 30 +++++------- service/rpc/service_test.go | 25 ++-------- service/service.go | 61 ----------------------- service/static/config.go | 34 ++++++++----- service/static/service.go | 30 ++---------- 17 files changed, 370 insertions(+), 226 deletions(-) create mode 100644 service/entry.go create mode 100644 service/injector.go create mode 100644 service/injector_test.go delete mode 100644 service/service.go diff --git a/cmd/rr/http/reset.go b/cmd/rr/http/reset.go index 431b7e88..3bc089ec 100644 --- a/cmd/rr/http/reset.go +++ b/cmd/rr/http/reset.go @@ -39,7 +39,7 @@ func init() { func reloadHandler(cmd *cobra.Command, args []string) error { svc, st := rr.Container.Get(rpc.ID) - if st < service.StatusConfigured { + if st < service.StatusOK { return errors.New("RPC service is not configured") } diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go index e697816f..b03c273f 100644 --- a/cmd/rr/http/workers.go +++ b/cmd/rr/http/workers.go @@ -74,7 +74,7 @@ func workersHandler(cmd *cobra.Command, args []string) (err error) { }() svc, st := rr.Container.Get(rrpc.ID) - if st < service.StatusConfigured { + if st < service.StatusOK { return errors.New("RPC service is not configured") } diff --git a/service/container.go b/service/container.go index 12c5a4a1..a003b7e3 100644 --- a/service/container.go +++ b/service/container.go @@ -7,20 +7,10 @@ import ( "sync" ) -// Config provides ability to slice configuration sections and unmarshal configuration data into -// given structure. -type Config interface { - // get nested config section (sub-map), returns nil if section not found. - Get(service string) Config - - // Unmarshal unmarshal config data into given struct. - Unmarshal(out interface{}) error -} - // Container controls all internal RR services and provides plugin based system. type Container interface { // Register add new service to the container under given name. - Register(name string, service Service) + Register(name string, service interface{}) // Reconfigure configures all underlying services with given configuration. Init(cfg Config) error @@ -30,7 +20,7 @@ type Container interface { // get returns svc instance by it's name or nil if svc not found. Method returns current service status // as second value. - Get(service string) (svc Service, status int) + Get(service string) (svc interface{}, status int) // Serve all configured services. Non blocking. Serve() error @@ -39,6 +29,36 @@ type Container interface { Stop() } +// Service can serve. Service can provide Init method which must return (bool, error) signature and might accept +// other services and/or configs as dependency. Container can be requested as well. Config can be requested in a form +// of service.Config or pointer to service specific config struct (automatically unmarshalled), config argument must +// implement service.HydrateConfig. +type Service interface { + // Serve serves. + Serve() error + + // Stop stops the service. + Stop() +} + +// Config provides ability to slice configuration sections and unmarshal configuration data into +// given structure. +type Config interface { + // get nested config section (sub-map), returns nil if section not found. + Get(service string) Config + + // Unmarshal unmarshal config data into given struct. + Unmarshal(out interface{}) error +} + +// HydrateConfig provides ability to automatically hydrate config with values using +// service.Config as the source. +type HydrateConfig interface { + // Hydrate must populate config values using given config source. + // Must return error if config is not valid. + Hydrate(cfg Config) error +} + type container struct { log logrus.FieldLogger mu sync.Mutex @@ -54,7 +74,7 @@ func NewContainer(log logrus.FieldLogger) Container { } // Register add new service to the container under given name. -func (c *container) Register(name string, service Service) { +func (c *container) Register(name string, service interface{}) { c.mu.Lock() defer c.mu.Unlock() @@ -82,7 +102,7 @@ func (c *container) Has(target string) bool { } // get returns svc instance by it's name or nil if svc not found. -func (c *container) Get(target string) (svc Service, status int) { +func (c *container) Get(target string) (svc interface{}, status int) { c.mu.Lock() defer c.mu.Unlock() @@ -98,27 +118,32 @@ func (c *container) Get(target string) (svc Service, status int) { // Init configures all underlying services with given configuration. func (c *container) Init(cfg Config) error { for _, e := range c.services { - if e.getStatus() >= StatusConfigured { + if e.getStatus() >= StatusOK { return fmt.Errorf("service [%s] has already been configured", e.name) } - segment := cfg.Get(e.name) - if segment == nil { - c.log.Debugf("[%s]: no config has been provided", e.name) - continue - } + // inject service dependencies (todo: move to container) + if ok, err := initService(e.svc, cfg.Get(e.name), c); err != nil { + if err == noConfig { + c.log.Warningf("[%s]: no config has been provided", e.name) + + // unable to meet dependency requirements, skippingF + continue + } - ok, err := e.svc.Init(segment, c) - if err != nil { return errors.Wrap(err, fmt.Sprintf("[%s]", e.name)) } else if ok { - e.setStatus(StatusConfigured) + e.setStatus(StatusOK) + c.log.Debugf("[%s]: initiated", e.name) + } else { + c.log.Debugf("[%s]: disabled", e.name) } } return nil } +//todo: refactor ???? // Serve all configured services. Non blocking. func (c *container) Serve() error { var ( @@ -127,7 +152,7 @@ func (c *container) Serve() error { ) for _, e := range c.services { - if e.hasStatus(StatusConfigured) { + if e.hasStatus(StatusOK) && e.canServe() { numServing++ } else { continue @@ -138,7 +163,7 @@ func (c *container) Serve() error { e.setStatus(StatusServing) defer e.setStatus(StatusStopped) - if err := e.svc.Serve(); err != nil { + if err := e.svc.(Service).Serve(); err != nil { c.log.Errorf("[%s]: %s", e.name, err) done <- errors.Wrap(err, fmt.Sprintf("[%s]", e.name)) } else { @@ -167,10 +192,12 @@ func (c *container) Serve() error { // Stop sends stop command to all running services. func (c *container) Stop() { + c.log.Debugf("received stop command") for _, e := range c.services { if e.hasStatus(StatusServing) { - e.svc.Stop() + e.svc.(Service).Stop() e.setStatus(StatusStopped) + c.log.Debugf("[%s]: stopped", e.name) } } diff --git a/service/container_test.go b/service/container_test.go index 3092be78..bf95cbd4 100644 --- a/service/container_test.go +++ b/service/container_test.go @@ -144,7 +144,7 @@ func TestContainer_Configure(t *testing.T) { s, st := c.Get("test") assert.IsType(t, &testService{}, s) - assert.Equal(t, StatusConfigured, st) + assert.Equal(t, StatusOK, st) } func TestContainer_ConfigureNull(t *testing.T) { @@ -176,7 +176,7 @@ func TestContainer_ConfigureDisabled(t *testing.T) { assert.Equal(t, 1, len(hook.Entries)) assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`})) - assert.Equal(t, 1, len(hook.Entries)) + assert.Equal(t, 2, len(hook.Entries)) s, st := c.Get("test") assert.IsType(t, &testService{}, s) diff --git a/service/entry.go b/service/entry.go new file mode 100644 index 00000000..f2cbac28 --- /dev/null +++ b/service/entry.go @@ -0,0 +1,56 @@ +package service + +import ( + "sync" +) + +const ( + // StatusUndefined when service bus can not find the service. + StatusUndefined = iota + + // StatusRegistered hasStatus setStatus when service has been registered in container. + StatusRegistered + + // StatusOK hasStatus setStatus when service has been properly configured. + StatusOK + + // StatusServing hasStatus setStatus when service hasStatus currently done. + StatusServing + + // StatusStopped hasStatus setStatus when service hasStatus stopped. + StatusStopped +) + +// entry creates association between service instance and given name. +type entry struct { + name string + svc interface{} + mu sync.Mutex + status int +} + +// status returns service status +func (e *entry) getStatus() int { + e.mu.Lock() + defer e.mu.Unlock() + + return e.status +} + +// setStarted indicates that service hasStatus status. +func (e *entry) setStatus(status int) { + e.mu.Lock() + defer e.mu.Unlock() + e.status = status +} + +// hasStatus checks if entry in specific status +func (e *entry) hasStatus(status int) bool { + return e.getStatus() == status +} + +// canServe returns true is service can serve. +func (e *entry) canServe() bool { + _, ok := e.svc.(Service) + return ok +} diff --git a/service/http/config.go b/service/http/config.go index 19a2e71d..e46b56cf 100644 --- a/service/http/config.go +++ b/service/http/config.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/spiral/roadrunner" "strings" + "github.com/spiral/roadrunner/service" ) // Config configures RoadRunner HTTP server. @@ -24,25 +25,34 @@ type Config struct { Workers *roadrunner.ServerConfig } +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + return c.Valid() +} + // Valid validates the configuration. -func (cfg *Config) Valid() error { - if cfg.Uploads == nil { +func (c *Config) Valid() error { + if c.Uploads == nil { return errors.New("mailformed uploads config") } - if cfg.Workers == nil { + if c.Workers == nil { return errors.New("mailformed workers config") } - if cfg.Workers.Pool == nil { + if c.Workers.Pool == nil { return errors.New("mailformed workers config (pool config is missing)") } - if err := cfg.Workers.Pool.Valid(); err != nil { + if err := c.Workers.Pool.Valid(); err != nil { return err } - if !strings.Contains(cfg.Address, ":") { + if !strings.Contains(c.Address, ":") { return errors.New("mailformed server address") } diff --git a/service/http/service.go b/service/http/service.go index 7405bf37..30289e3c 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -3,7 +3,6 @@ package http import ( "context" "github.com/spiral/roadrunner" - "github.com/spiral/roadrunner/service" "github.com/spiral/roadrunner/service/rpc" "net/http" "sync" @@ -42,28 +41,14 @@ func (s *Service) AddListener(l func(event int, ctx interface{})) { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg service.Config, c service.Container) (bool, error) { - config := &Config{} - - if err := cfg.Unmarshal(config); err != nil { - return false, err - } - - if !config.Enable { +func (s *Service) Init(cfg *Config, r *rpc.Service) (bool, error) { + if !cfg.Enable { return false, nil } - if err := config.Valid(); err != nil { - return false, err - } - - s.cfg = config - - // registering http RPC interface - if r, ok := c.Get(rpc.ID); ok >= service.StatusConfigured { - if h, ok := r.(*rpc.Service); ok { - h.Register(ID, &rpcServer{s}) - } + s.cfg = cfg + if r != nil { + r.Register(ID, &rpcServer{s}) } return true, nil diff --git a/service/http/service_test.go b/service/http/service_test.go index 50836b4b..b442ae51 100644 --- a/service/http/service_test.go +++ b/service/http/service_test.go @@ -42,7 +42,7 @@ func Test_Service_NoConfig(t *testing.T) { c := service.NewContainer(logger) c.Register(ID, &Service{}) - assert.NoError(t, c.Init(&testCfg{httpCfg: `{}`})) + assert.Error(t, c.Init(&testCfg{httpCfg: `{}`})) s, st := c.Get(ID) assert.NotNil(t, s) @@ -108,7 +108,7 @@ func Test_Service_Configure_Enable(t *testing.T) { s, st := c.Get(ID) assert.NotNil(t, s) - assert.Equal(t, service.StatusConfigured, st) + assert.Equal(t, service.StatusOK, st) } func Test_Service_Echo(t *testing.T) { @@ -139,10 +139,10 @@ func Test_Service_Echo(t *testing.T) { s, st := c.Get(ID) assert.NotNil(t, s) - assert.Equal(t, service.StatusConfigured, st) + assert.Equal(t, service.StatusOK, st) // should do nothing - s.Stop() + s.(*Service).Stop() go func() { c.Serve() }() time.Sleep(time.Millisecond * 100) @@ -191,7 +191,7 @@ func Test_Service_ErrorEcho(t *testing.T) { s, st := c.Get(ID) assert.NotNil(t, s) - assert.Equal(t, service.StatusConfigured, st) + assert.Equal(t, service.StatusOK, st) goterr := make(chan interface{}) s.(*Service).AddListener(func(event int, ctx interface{}) { @@ -251,7 +251,7 @@ func Test_Service_Middleware(t *testing.T) { s, st := c.Get(ID) assert.NotNil(t, s) - assert.Equal(t, service.StatusConfigured, st) + assert.Equal(t, service.StatusOK, st) s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { @@ -325,7 +325,7 @@ func Test_Service_Listener(t *testing.T) { s, st := c.Get(ID) assert.NotNil(t, s) - assert.Equal(t, service.StatusConfigured, st) + assert.Equal(t, service.StatusOK, st) stop := make(chan interface{}) s.(*Service).AddListener(func(event int, ctx interface{}) { diff --git a/service/injector.go b/service/injector.go new file mode 100644 index 00000000..e7dfaa0b --- /dev/null +++ b/service/injector.go @@ -0,0 +1,112 @@ +package service + +import ( + "reflect" + "fmt" +) + +const initMethod = "Init" + +var noConfig = fmt.Errorf("no config has been provided") + +// calls Init method with automatically resolved arguments. +func initService(s interface{}, cfg Config, c *container) (bool, error) { + r := reflect.TypeOf(s) + + m, ok := r.MethodByName(initMethod) + if !ok { + // no Init method is presented, assuming service does not need + // initialization. + return false, nil + } + + if err := verifySignature(m); err != nil { + return false, err + } + + // hydrating + values, err := injectValues(m, s, cfg, c) + if err != nil { + return false, err + } + + // initiating service + out := m.Func.Call(values) + + if out[1].IsNil() { + return out[0].Bool(), nil + } + + return out[0].Bool(), out[1].Interface().(error) +} + +// injectValues returns slice of call arguments for service Init method. +func injectValues(m reflect.Method, s interface{}, cfg Config, c *container) (values []reflect.Value, err error) { + for i := 0; i < m.Type.NumIn(); i ++ { + v := m.Type.In(i) + + switch { + case v.ConvertibleTo(reflect.ValueOf(s).Type()): // service itself + values = append(values, reflect.ValueOf(s)) + + case v.Implements(reflect.TypeOf((*HydrateConfig)(nil)).Elem()): // automatically configured config + if cfg == nil { + // todo: generic value + return nil, noConfig + } + + sc := reflect.New(v.Elem()) + if err := sc.Interface().(HydrateConfig).Hydrate(cfg); err != nil { + return nil, err + } + + values = append(values, sc) + + case v.Implements(reflect.TypeOf((*Config)(nil)).Elem()): // config section + if cfg == nil { + // todo: generic value + return nil, noConfig + } + values = append(values, reflect.ValueOf(cfg)) + + case v.Implements(reflect.TypeOf((*Container)(nil)).Elem()): // container + values = append(values, reflect.ValueOf(c)) + + default: + found := false + + // looking for the service candidate + for _, e := range c.services { + if v.ConvertibleTo(reflect.ValueOf(e.svc).Type()) { + found = true + values = append(values, reflect.ValueOf(e.svc)) + break + } + } + + if !found { + // placeholder (make sure to check inside the method) + values = append(values, reflect.New(v).Elem()) + } + } + } + + return +} + +// verifySignature checks if Init method has valid signature +func verifySignature(m reflect.Method) error { + if m.Type.NumOut() != 2 { + return fmt.Errorf("method Init must have exact 2 return values") + } + + if m.Type.Out(0).Kind() != reflect.Bool { + return fmt.Errorf("first return value of Init method must be bool type") + } + + if !m.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { + return fmt.Errorf("second return value of Init method value must be error type") + } + + return nil +} diff --git a/service/injector_test.go b/service/injector_test.go new file mode 100644 index 00000000..eaf5fa72 --- /dev/null +++ b/service/injector_test.go @@ -0,0 +1,24 @@ +package service + +import ( + "testing" + "github.com/stretchr/testify/assert" + "v/github.com/sirupsen/logrus@v1.0.5/hooks/test" + "github.com/sirupsen/logrus" +) + +func TestContainer_Init(t *testing.T) { + logger, hook := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + svc := &testService{ok: true} + + c := NewContainer(logger) + c.Register("test", svc) + c.Register("test2", struct{}{}) + + assert.Equal(t, 2, len(hook.Entries)) + + assert.NoError(t, c.Serve()) + c.Stop() +} \ No newline at end of file diff --git a/service/rpc/config.go b/service/rpc/config.go index e3168945..0485fdf6 100644 --- a/service/rpc/config.go +++ b/service/rpc/config.go @@ -5,9 +5,11 @@ import ( "net" "strings" "syscall" + "github.com/spiral/roadrunner/service" ) -type config struct { +// Config defines RPC service config. +type Config struct { // Indicates if RPC connection is enabled. Enable bool @@ -15,9 +17,31 @@ type config struct { Listen string } -// listener creates new rpc socket listener. -func (cfg *config) listener() (net.Listener, error) { - dsn := strings.Split(cfg.Listen, "://") +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + return c.Valid() +} + +// Valid returns nil if config is valid. +func (c *Config) Valid() error { + if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { + return errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { + return errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + return nil +} + +// Listener creates new rpc socket Listener. +func (c *Config) Listener() (net.Listener, error) { + dsn := strings.Split(c.Listen, "://") if len(dsn) != 2 { return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") } @@ -29,9 +53,9 @@ func (cfg *config) listener() (net.Listener, error) { return net.Listen(dsn[0], dsn[1]) } -// dialer creates rpc socket dialer. -func (cfg *config) dialer() (net.Conn, error) { - dsn := strings.Split(cfg.Listen, "://") +// Dialer creates rpc socket Dialer. +func (c *Config) Dialer() (net.Conn, error) { + dsn := strings.Split(c.Listen, "://") if len(dsn) != 2 { return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") } diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go index a953e30e..87a89a2b 100644 --- a/service/rpc/config_test.go +++ b/service/rpc/config_test.go @@ -6,10 +6,12 @@ import ( "testing" ) +// todo: test hydrate + func TestConfig_Listener(t *testing.T) { - cfg := &config{Listen: "tcp://:18001"} + cfg := &Config{Listen: "tcp://:18001"} - ln, err := cfg.listener() + ln, err := cfg.Listener() assert.NoError(t, err) assert.NotNil(t, ln) defer ln.Close() @@ -23,9 +25,9 @@ func TestConfig_ListenerUnix(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "unix://rpc.sock"} + cfg := &Config{Listen: "unix://rpc.sock"} - ln, err := cfg.listener() + ln, err := cfg.Listener() assert.NoError(t, err) assert.NotNil(t, ln) defer ln.Close() @@ -39,28 +41,28 @@ func Test_Config_Error(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "uni:unix.sock"} - ln, err := cfg.listener() + cfg := &Config{Listen: "uni:unix.sock"} + ln, err := cfg.Listener() assert.Nil(t, ln) assert.Error(t, err) assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) } func Test_Config_ErrorMethod(t *testing.T) { - cfg := &config{Listen: "xinu://unix.sock"} + cfg := &Config{Listen: "xinu://unix.sock"} - ln, err := cfg.listener() + ln, err := cfg.Listener() assert.Nil(t, ln) assert.Error(t, err) } func TestConfig_Dialer(t *testing.T) { - cfg := &config{Listen: "tcp://:18001"} + cfg := &Config{Listen: "tcp://:18001"} - ln, err := cfg.listener() + ln, err := cfg.Listener() defer ln.Close() - conn, err := cfg.dialer() + conn, err := cfg.Dialer() assert.NoError(t, err) assert.NotNil(t, conn) defer conn.Close() @@ -74,12 +76,12 @@ func TestConfig_DialerUnix(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "unix://rpc.sock"} + cfg := &Config{Listen: "unix://rpc.sock"} - ln, err := cfg.listener() + ln, err := cfg.Listener() defer ln.Close() - conn, err := cfg.dialer() + conn, err := cfg.Dialer() assert.NoError(t, err) assert.NotNil(t, conn) defer conn.Close() @@ -93,17 +95,17 @@ func Test_Config_DialerError(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "uni:unix.sock"} - ln, err := cfg.dialer() + cfg := &Config{Listen: "uni:unix.sock"} + ln, err := cfg.Dialer() assert.Nil(t, ln) assert.Error(t, err) assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) } func Test_Config_DialerErrorMethod(t *testing.T) { - cfg := &config{Listen: "xinu://unix.sock"} + cfg := &Config{Listen: "xinu://unix.sock"} - ln, err := cfg.dialer() + ln, err := cfg.Dialer() assert.Nil(t, ln) assert.Error(t, err) } diff --git a/service/rpc/service.go b/service/rpc/service.go index 82f26407..621348e8 100644 --- a/service/rpc/service.go +++ b/service/rpc/service.go @@ -3,7 +3,6 @@ package rpc import ( "errors" "github.com/spiral/goridge" - "github.com/spiral/roadrunner/service" "net/rpc" "sync" ) @@ -13,27 +12,20 @@ const ID = "rpc" // Service is RPC service. type Service struct { - cfg *config - stop chan interface{} - rpc *rpc.Server - + cfg *Config + stop chan interface{} + rpc *rpc.Server mu sync.Mutex serving bool } -// Init must return configure service and return true if service hasStatus enabled. Must return error in case of -// misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg service.Config, reg service.Container) (enabled bool, err error) { - config := &config{} - if err := cfg.Unmarshal(config); err != nil { - return false, err - } - - if !config.Enable { +// Init rpc service. Must return true if service is enabled. +func (s *Service) Init(cfg *Config) (enabled bool, err error) { + if !cfg.Enable { return false, nil } - s.cfg = config + s.cfg = cfg s.rpc = rpc.NewServer() return true, nil @@ -50,7 +42,7 @@ func (s *Service) Serve() error { s.stop = make(chan interface{}) s.mu.Unlock() - ln, err := s.cfg.listener() + ln, err := s.cfg.Listener() if err != nil { return err } @@ -99,12 +91,12 @@ func (s *Service) Stop() { // - one return value, of type error // It returns an error if the receiver is not an exported type or has // no suitable methods. It also logs the error using package log. -func (s *Service) Register(name string, rcvr interface{}) error { +func (s *Service) Register(name string, svc interface{}) error { if s.rpc == nil { return errors.New("RPC service is not configured") } - return s.rpc.RegisterName(name, rcvr) + return s.rpc.RegisterName(name, svc) } // Client creates new RPC client. @@ -113,7 +105,7 @@ func (s *Service) Client() (*rpc.Client, error) { return nil, errors.New("RPC service is not configured") } - conn, err := s.cfg.dialer() + conn, err := s.cfg.Dialer() if err != nil { return nil, err } diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go index d4734bb5..fc88d38d 100644 --- a/service/rpc/service_test.go +++ b/service/rpc/service_test.go @@ -1,8 +1,6 @@ package rpc import ( - "encoding/json" - "github.com/spiral/roadrunner/service" "github.com/stretchr/testify/assert" "testing" "time" @@ -12,22 +10,9 @@ type testService struct{} func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil } -type testCfg struct{ cfg string } - -func (cfg *testCfg) Get(name string) service.Config { return nil } -func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func Test_ConfigError(t *testing.T) { - s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":false`}, nil) - - assert.Error(t, err) - assert.False(t, ok) -} - func Test_Disabled(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":false}`}, nil) + ok, err := s.Init(&Config{Enable: false}) assert.NoError(t, err) assert.False(t, ok) @@ -45,7 +30,7 @@ func Test_RegisterNotConfigured(t *testing.T) { func Test_Enabled(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}) assert.NoError(t, err) assert.True(t, ok) @@ -53,7 +38,7 @@ func Test_Enabled(t *testing.T) { func Test_StopNonServing(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}) assert.NoError(t, err) assert.True(t, ok) @@ -62,7 +47,7 @@ func Test_StopNonServing(t *testing.T) { func Test_Serve_Errors(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"}) assert.NoError(t, err) assert.True(t, ok) @@ -75,7 +60,7 @@ func Test_Serve_Errors(t *testing.T) { func Test_Serve_Client(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9018"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}) assert.NoError(t, err) assert.True(t, ok) diff --git a/service/service.go b/service/service.go deleted file mode 100644 index 6cd12b51..00000000 --- a/service/service.go +++ /dev/null @@ -1,61 +0,0 @@ -package service - -import "sync" - -// Service provides high level functionality for road runner modules. -type Service interface { - // Init must return configure service and return true if service hasStatus enabled. Must return error in case of - // misconfiguration. Services must not be used without proper configuration pushed first. - Init(cfg Config, c Container) (enabled bool, err error) - - // Serve serves. - Serve() error - - // Stop stops the service. - Stop() -} - -const ( - // StatusUndefined when service bus can not find the service. - StatusUndefined = iota - - // StatusRegistered hasStatus setStatus when service has been registered in container. - StatusRegistered - - // StatusConfigured hasStatus setStatus when service has been properly configured. - StatusConfigured - - // StatusServing hasStatus setStatus when service hasStatus currently done. - StatusServing - - // StatusStopped hasStatus setStatus when service hasStatus stopped. - StatusStopped -) - -// entry creates association between service instance and given name. -type entry struct { - name string - svc Service - mu sync.Mutex - status int -} - -// status returns service status -func (e *entry) getStatus() int { - e.mu.Lock() - defer e.mu.Unlock() - - return e.status -} - -// setStarted indicates that service hasStatus status. -func (e *entry) setStatus(status int) { - e.mu.Lock() - defer e.mu.Unlock() - e.status = status -} - -// hasStatus checks if entry in specific status -func (e *entry) hasStatus(status int) bool { - return e.getStatus() == status -} diff --git a/service/static/config.go b/service/static/config.go index 1020b8cd..6a2d1206 100644 --- a/service/static/config.go +++ b/service/static/config.go @@ -5,6 +5,7 @@ import ( "os" "path" "strings" + "github.com/spiral/roadrunner/service" ) // Config describes file location and controls access to them. @@ -20,22 +21,18 @@ type Config struct { Forbid []string } -// Forbids must return true if file extension is not allowed for the upload. -func (cfg *Config) Forbids(filename string) bool { - ext := strings.ToLower(path.Ext(filename)) - - for _, v := range cfg.Forbid { - if ext == v { - return true - } +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err } - return false + return c.Valid() } -// Valid validates existence of directory. -func (cfg *Config) Valid() error { - st, err := os.Stat(cfg.Dir) +// Valid returns nil if config is valid. +func (c *Config) Valid() error { + st, err := os.Stat(c.Dir) if err != nil { if os.IsNotExist(err) { return errors.New("root directory does not exists") @@ -50,3 +47,16 @@ func (cfg *Config) Valid() error { return nil } + +// Forbids must return true if file extension is not allowed for the upload. +func (c *Config) Forbids(filename string) bool { + ext := strings.ToLower(path.Ext(filename)) + + for _, v := range c.Forbid { + if ext == v { + return true + } + } + + return false +} diff --git a/service/static/service.go b/service/static/service.go index add242e4..968cb594 100644 --- a/service/static/service.go +++ b/service/static/service.go @@ -1,7 +1,6 @@ package static import ( - "github.com/spiral/roadrunner/service" rrttp "github.com/spiral/roadrunner/service/http" "net/http" "path" @@ -22,39 +21,18 @@ type Service struct { // Init must return configure service and return true if service hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg service.Config, c service.Container) (enabled bool, err error) { - config := &Config{} - if err := cfg.Unmarshal(config); err != nil { - return false, err - } - - if !config.Enable { +func (s *Service) Init(cfg *Config, r *rrttp.Service) (enabled bool, err error) { + if !cfg.Enable || r == nil { return false, nil } - if err := config.Valid(); err != nil { - return false, err - } - - s.cfg = config + s.cfg = cfg s.root = http.Dir(s.cfg.Dir) - - // registering as middleware - if h, ok := c.Get(rrttp.ID); ok >= service.StatusConfigured { - if h, ok := h.(*rrttp.Service); ok { - h.AddMiddleware(s.middleware) - } - } + r.AddMiddleware(s.middleware) return true, nil } -// Serve serves the service. -func (s *Service) Serve() error { return nil } - -// Stop stops the service. -func (s *Service) Stop() {} - // middleware must return true if request/response pair is handled within the middleware. func (s *Service) middleware(f http.HandlerFunc) http.HandlerFunc { // Define the http.HandlerFunc -- cgit v1.2.3 From 73e7ac6a6ad443a7f26e8be4f1cac776b3dcff93 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 13:10:24 -0700 Subject: more tests --- service/entry_test.go | 16 ++++++++++++++++ service/rpc/service.go | 2 +- service/static/service.go | 2 +- 3 files changed, 18 insertions(+), 2 deletions(-) create mode 100644 service/entry_test.go diff --git a/service/entry_test.go b/service/entry_test.go new file mode 100644 index 00000000..67f982ba --- /dev/null +++ b/service/entry_test.go @@ -0,0 +1,16 @@ +package service + +import ( + "testing" + "github.com/stretchr/testify/assert" +) + +func TestEntry_CanServeFalse(t *testing.T) { + e := &entry{svc: nil} + assert.False(t, e.canServe()) +} + +func TestEntry_CanServeTrue(t *testing.T) { + e := &entry{svc: &testService{}} + assert.True(t, e.canServe()) +} diff --git a/service/rpc/service.go b/service/rpc/service.go index 621348e8..6e231048 100644 --- a/service/rpc/service.go +++ b/service/rpc/service.go @@ -20,7 +20,7 @@ type Service struct { } // Init rpc service. Must return true if service is enabled. -func (s *Service) Init(cfg *Config) (enabled bool, err error) { +func (s *Service) Init(cfg *Config) (bool, error) { if !cfg.Enable { return false, nil } diff --git a/service/static/service.go b/service/static/service.go index 968cb594..98d8313c 100644 --- a/service/static/service.go +++ b/service/static/service.go @@ -21,7 +21,7 @@ type Service struct { // Init must return configure service and return true if service hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg *Config, r *rrttp.Service) (enabled bool, err error) { +func (s *Service) Init(cfg *Config, r *rrttp.Service) (bool, error) { if !cfg.Enable || r == nil { return false, nil } -- cgit v1.2.3 From da898982d2e8892b29824acf7c6af2f8b50878cc Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 13:11:01 -0700 Subject: more tests --- service/injector_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/injector_test.go b/service/injector_test.go index eaf5fa72..3a2bf405 100644 --- a/service/injector_test.go +++ b/service/injector_test.go @@ -3,7 +3,7 @@ package service import ( "testing" "github.com/stretchr/testify/assert" - "v/github.com/sirupsen/logrus@v1.0.5/hooks/test" + "github.com/sirupsen/logrus/hooks/test" "github.com/sirupsen/logrus" ) -- cgit v1.2.3 From 78c4f250ba81266eab64288d44cf91b85ad00ba9 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 19:49:04 -0700 Subject: request time context variable --- php-src/PSR7Client.php | 38 ++++++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/php-src/PSR7Client.php b/php-src/PSR7Client.php index 858e8405..dc0070a9 100644 --- a/php-src/PSR7Client.php +++ b/php-src/PSR7Client.php @@ -64,6 +64,8 @@ class PSR7Client $bodyStream->write($body); } + $_SERVER = $this->configureServer($ctx); + $request = new Diactoros\ServerRequest( $_SERVER, $this->wrapUploads($ctx['uploads']), @@ -105,6 +107,26 @@ class PSR7Client ])); } + /** + * Returns altered copy of _SERVER variable. Sets ip-address, + * request-time and other values. + * + * @param array $ctx + * @return array + */ + protected function configureServer(array $ctx): array + { + $server = $_SERVER; + $server['REQUEST_TIME'] = time(); + $server['REQUEST_TIME_FLOAT'] = microtime(true); + + if (!empty($ctx['remoteAddr'])) { + $server['REMOTE_ADDR'] = $ctx['remoteAddr']; + } + + return $server; + } + /** * Wraps all uploaded files with UploadedFile. * @@ -119,18 +141,18 @@ class PSR7Client } $result = []; - foreach ($files as $index => $file) { - if (!isset($file['name'])) { - $result[$index] = $this->wrapUploads($file); + foreach ($files as $index => $f) { + if (!isset($f['name'])) { + $result[$index] = $this->wrapUploads($f); continue; } $result[$index] = new Diactoros\UploadedFile( - $file['tmpName'], - $file['size'], - $file['error'], - $file['name'], - $file['mime'] + $f['tmpName'], + $f['size'], + $f['error'], + $f['name'], + $f['mime'] ); } -- cgit v1.2.3 From 3ef9d686381e6f888c32c707c4ed245cb546506f Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 20:13:30 -0700 Subject: remote IP --- service/container.go | 2 +- service/http/request.go | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/service/container.go b/service/container.go index a003b7e3..02ebbf26 100644 --- a/service/container.go +++ b/service/container.go @@ -158,7 +158,7 @@ func (c *container) Serve() error { continue } - c.log.Debugf("[%s]: started", e.name) + c.log.Debugf("[%s]: service started", e.name) go func(e *entry) { e.setStatus(StatusServing) defer e.setStatus(StatusStopped) diff --git a/service/http/request.go b/service/http/request.go index 912843e9..08e2a03e 100644 --- a/service/http/request.go +++ b/service/http/request.go @@ -9,6 +9,7 @@ import ( "net/url" "strings" "github.com/spiral/roadrunner/service/http/attributes" + "net" ) const ( @@ -21,6 +22,9 @@ const ( // Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. type Request struct { + // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address. + RemoteAddr string `json:"remoteAddr"` + // Protocol includes HTTP protocol version. Protocol string `json:"protocol"` @@ -64,6 +68,13 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { Attributes: attributes.All(r), } + // otherwise, return remote address as is + if strings.ContainsRune(r.RemoteAddr, ':') { + req.RemoteAddr, _, _ = net.SplitHostPort(r.RemoteAddr) + } else { + req.RemoteAddr = r.RemoteAddr + } + for _, c := range r.Cookies() { if v, err := url.QueryUnescape(c.Value); err == nil { req.Cookies[c.Name] = v -- cgit v1.2.3 From 9f59155c781f0178e52c53abd2b437d40c51a52d Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 20:16:16 -0700 Subject: ip address fallback to attributes (for middleware) --- php-src/PSR7Client.php | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/php-src/PSR7Client.php b/php-src/PSR7Client.php index dc0070a9..e8d93fe8 100644 --- a/php-src/PSR7Client.php +++ b/php-src/PSR7Client.php @@ -107,7 +107,7 @@ class PSR7Client ])); } - /** + /** * Returns altered copy of _SERVER variable. Sets ip-address, * request-time and other values. * @@ -119,10 +119,7 @@ class PSR7Client $server = $_SERVER; $server['REQUEST_TIME'] = time(); $server['REQUEST_TIME_FLOAT'] = microtime(true); - - if (!empty($ctx['remoteAddr'])) { - $server['REMOTE_ADDR'] = $ctx['remoteAddr']; - } + $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1'; return $server; } -- cgit v1.2.3 From dafe0c3f4877c25157b5df4ca69eb23df248ab7a Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 20:38:01 -0700 Subject: better error buffer --- cmd/rr/debug/debugger.go | 2 +- error_buffer.go | 3 +++ error_buffer_test.go | 15 +++++++++++++-- service/injector.go | 2 +- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go index 0dca43de..533a5947 100644 --- a/cmd/rr/debug/debugger.go +++ b/cmd/rr/debug/debugger.go @@ -58,7 +58,7 @@ func (s *debugger) listener(event int, ctx interface{}) { // outputs switch event { case roadrunner.EventStderrOutput: - s.logger.Warning(strings.Trim(string(ctx.([]byte)), "\r\n")) + s.logger.Warning(utils.Sprintf("%s", strings.Trim(string(ctx.([]byte)), "\r\n"))) } // rr server events diff --git a/error_buffer.go b/error_buffer.go index 8be9c5a8..211fe25f 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -43,7 +43,9 @@ func newErrBuffer() *errBuffer { if len(eb.buf) > eb.last { if eb.lsn != nil { eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + eb.buf = eb.buf[0:0] } + eb.last = len(eb.buf) } eb.mu.Unlock() @@ -55,6 +57,7 @@ func newErrBuffer() *errBuffer { if eb.lsn != nil { eb.lsn(EventStderrOutput, eb.buf[eb.last:]) } + eb.last = len(eb.buf) } eb.mu.Unlock() diff --git a/error_buffer_test.go b/error_buffer_test.go index 09ea4f03..81107935 100644 --- a/error_buffer_test.go +++ b/error_buffer_test.go @@ -30,8 +30,7 @@ func TestErrBuffer_Write_Event(t *testing.T) { <-tr // messages are read - assert.Equal(t, 6, buf.Len()) - assert.Equal(t, "hello\n", buf.String()) + assert.Equal(t, 0, buf.Len()) } func TestErrBuffer_Write_Event_Separated(t *testing.T) { @@ -50,6 +49,18 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) { buf.Write([]byte("ending")) <-tr + assert.Equal(t, 0, buf.Len()) + assert.Equal(t, "", buf.String()) +} + +func TestErrBuffer_Write_Event_Separated_NoListener(t *testing.T) { + buf := newErrBuffer() + defer buf.Close() + + buf.Write([]byte("hel")) + buf.Write([]byte("lo\n")) + buf.Write([]byte("ending")) + assert.Equal(t, 12, buf.Len()) assert.Equal(t, "hello\nending", buf.String()) } diff --git a/service/injector.go b/service/injector.go index e7dfaa0b..2d18b651 100644 --- a/service/injector.go +++ b/service/injector.go @@ -77,7 +77,7 @@ func injectValues(m reflect.Method, s interface{}, cfg Config, c *container) (va // looking for the service candidate for _, e := range c.services { - if v.ConvertibleTo(reflect.ValueOf(e.svc).Type()) { + if v.ConvertibleTo(reflect.ValueOf(e.svc).Type()) && e.hasStatus(StatusOK) { found = true values = append(values, reflect.ValueOf(e.svc)) break -- cgit v1.2.3 From 89fe2046ce3b2c22188d731d00db4917fc77d834 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 20:51:22 -0700 Subject: seconds can be used in config --- service/http/config.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/service/http/config.go b/service/http/config.go index e46b56cf..ebdd0343 100644 --- a/service/http/config.go +++ b/service/http/config.go @@ -5,6 +5,7 @@ import ( "github.com/spiral/roadrunner" "strings" "github.com/spiral/roadrunner/service" + "time" ) // Config configures RoadRunner HTTP server. @@ -31,7 +32,23 @@ func (c *Config) Hydrate(cfg service.Config) error { return err } - return c.Valid() + if err := c.Valid(); err != nil { + return err + } + + if c.Workers.RelayTimeout < time.Microsecond { + c.Workers.RelayTimeout = time.Second * time.Duration(c.Workers.RelayTimeout.Nanoseconds()) + } + + if c.Workers.Pool.AllocateTimeout < time.Microsecond { + c.Workers.Pool.AllocateTimeout = time.Second * time.Duration(c.Workers.Pool.AllocateTimeout.Nanoseconds()) + } + + if c.Workers.Pool.DestroyTimeout < time.Microsecond { + c.Workers.Pool.DestroyTimeout = time.Second * time.Duration(c.Workers.Pool.DestroyTimeout.Nanoseconds()) + } + + return nil } // Valid validates the configuration. -- cgit v1.2.3 From a180d5e1128f735976497dff69c8c3a1061c42c7 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:01:19 -0700 Subject: better debugging --- cmd/rr/debug/debugger.go | 48 +++++++++++++++++++++++++++++++++++------------- service/http/handler.go | 29 ++++++++++++++++------------- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go index 533a5947..fdf3c1be 100644 --- a/cmd/rr/debug/debugger.go +++ b/cmd/rr/debug/debugger.go @@ -4,8 +4,10 @@ import ( "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/cmd/rr/utils" - "github.com/spiral/roadrunner/service/http" + rrhttp "github.com/spiral/roadrunner/service/http" "strings" + "fmt" + "net/http" ) // Listener creates new debug listener. @@ -20,21 +22,32 @@ type debugger struct{ logger *logrus.Logger } func (s *debugger) listener(event int, ctx interface{}) { // http events switch event { - case http.EventResponse: - log := ctx.(*http.Event) - s.logger.Info(utils.Sprintf("%s %s %s", statusColor(log.Status), log.Method, log.URI)) - case http.EventError: - log := ctx.(*http.Event) - - if _, ok := log.Error.(roadrunner.JobError); ok { - s.logger.Info(utils.Sprintf("%s %s %s", statusColor(log.Status), log.Method, log.URI)) + case rrhttp.EventResponse: + e := ctx.(*rrhttp.ResponseEvent) + s.logger.Info(utils.Sprintf( + "%s %s %s %s", + e.Request.RemoteAddr, + statusColor(e.Response.Status), + e.Request.Method, + e.Request.URI, + )) + case rrhttp.EventError: + e := ctx.(*rrhttp.ErrorEvent) + + if _, ok := e.Error.(roadrunner.JobError); ok { + s.logger.Info(utils.Sprintf( + "%s %s %s", + statusColor(500), + e.Request.Method, + uri(e.Request), + )) } else { s.logger.Info(utils.Sprintf( "%s %s %s %s", - statusColor(log.Status), - log.Method, - log.URI, - log.Error, + statusColor(500), + e.Request.Method, + uri(e.Request), + e.Error, )) } } @@ -93,3 +106,12 @@ func statusColor(status int) string { return utils.Sprintf("%v", status) } + +// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). +func uri(r *http.Request) string { + if r.TLS != nil { + return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) + } + + return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) +} \ No newline at end of file diff --git a/service/http/handler.go b/service/http/handler.go index 6f2617b1..9e67e5b4 100644 --- a/service/http/handler.go +++ b/service/http/handler.go @@ -9,26 +9,29 @@ import ( ) const ( - // EventResponse thrown after the request been processed. See Event as payload. + // EventResponse thrown after the request been processed. See ErrorEvent as payload. EventResponse = iota + 500 // EventError thrown on any non job error provided by road runner server. EventError ) -// Event represents singular http response event. -type Event struct { - // Method of the request. - Method string +// ErrorEvent represents singular http error event. +type ErrorEvent struct { + // Request contains client request, must not be stored. + Request *http.Request - // URI requested by the client. - URI string + // Error - associated error, if any. + Error error +} - // Status is response status. - Status int +// ResponseEvent represents singular http response event. +type ResponseEvent struct { + // Request contains client request, must not be stored. + Request *Request - // Associated error, if any. - Error error + // Response contains service response. + Response *Response } // Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, @@ -99,7 +102,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // handleError sends error. func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error) { - h.throw(EventError, &Event{Method: r.Method, URI: uri(r), Status: 500, Error: err}) + h.throw(EventError, &ErrorEvent{Request: r, Error: err}) w.WriteHeader(500) w.Write([]byte(err.Error())) @@ -107,7 +110,7 @@ func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error) // handleResponse triggers response event. func (h *Handler) handleResponse(req *Request, resp *Response) { - h.throw(EventResponse, &Event{Method: req.Method, URI: req.URI, Status: resp.Status}) + h.throw(EventResponse, &ResponseEvent{Request: req, Response: resp}) } // throw invokes event srv if any. -- cgit v1.2.3 From ef38a99429c9916137ecd5953703a694bfc1c569 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:01:40 -0700 Subject: go fmt --- cmd/rr/debug/debugger.go | 6 +++--- service/entry_test.go | 2 +- service/http/config.go | 2 +- service/http/request.go | 4 ++-- service/http/service.go | 2 +- service/injector.go | 4 ++-- service/injector_test.go | 8 ++++---- service/rpc/config.go | 2 +- service/static/config.go | 2 +- 9 files changed, 16 insertions(+), 16 deletions(-) diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go index fdf3c1be..8ec116c2 100644 --- a/cmd/rr/debug/debugger.go +++ b/cmd/rr/debug/debugger.go @@ -1,13 +1,13 @@ package debug import ( + "fmt" "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/cmd/rr/utils" rrhttp "github.com/spiral/roadrunner/service/http" - "strings" - "fmt" "net/http" + "strings" ) // Listener creates new debug listener. @@ -114,4 +114,4 @@ func uri(r *http.Request) string { } return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) -} \ No newline at end of file +} diff --git a/service/entry_test.go b/service/entry_test.go index 67f982ba..b5c71a10 100644 --- a/service/entry_test.go +++ b/service/entry_test.go @@ -1,8 +1,8 @@ package service import ( - "testing" "github.com/stretchr/testify/assert" + "testing" ) func TestEntry_CanServeFalse(t *testing.T) { diff --git a/service/http/config.go b/service/http/config.go index ebdd0343..ba303aae 100644 --- a/service/http/config.go +++ b/service/http/config.go @@ -3,8 +3,8 @@ package http import ( "errors" "github.com/spiral/roadrunner" - "strings" "github.com/spiral/roadrunner/service" + "strings" "time" ) diff --git a/service/http/request.go b/service/http/request.go index 08e2a03e..6d5cc126 100644 --- a/service/http/request.go +++ b/service/http/request.go @@ -4,12 +4,12 @@ import ( "encoding/json" "fmt" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service/http/attributes" "io/ioutil" + "net" "net/http" "net/url" "strings" - "github.com/spiral/roadrunner/service/http/attributes" - "net" ) const ( diff --git a/service/http/service.go b/service/http/service.go index 30289e3c..f7fdf2ab 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -3,11 +3,11 @@ package http import ( "context" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service/http/attributes" "github.com/spiral/roadrunner/service/rpc" "net/http" "sync" "sync/atomic" - "github.com/spiral/roadrunner/service/http/attributes" ) // ID contains default svc name. diff --git a/service/injector.go b/service/injector.go index 2d18b651..3c41240a 100644 --- a/service/injector.go +++ b/service/injector.go @@ -1,8 +1,8 @@ package service import ( - "reflect" "fmt" + "reflect" ) const initMethod = "Init" @@ -42,7 +42,7 @@ func initService(s interface{}, cfg Config, c *container) (bool, error) { // injectValues returns slice of call arguments for service Init method. func injectValues(m reflect.Method, s interface{}, cfg Config, c *container) (values []reflect.Value, err error) { - for i := 0; i < m.Type.NumIn(); i ++ { + for i := 0; i < m.Type.NumIn(); i++ { v := m.Type.In(i) switch { diff --git a/service/injector_test.go b/service/injector_test.go index 3a2bf405..facc3f74 100644 --- a/service/injector_test.go +++ b/service/injector_test.go @@ -1,10 +1,10 @@ package service import ( - "testing" - "github.com/stretchr/testify/assert" - "github.com/sirupsen/logrus/hooks/test" "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "testing" ) func TestContainer_Init(t *testing.T) { @@ -21,4 +21,4 @@ func TestContainer_Init(t *testing.T) { assert.NoError(t, c.Serve()) c.Stop() -} \ No newline at end of file +} diff --git a/service/rpc/config.go b/service/rpc/config.go index 0485fdf6..c37b0853 100644 --- a/service/rpc/config.go +++ b/service/rpc/config.go @@ -2,10 +2,10 @@ package rpc import ( "errors" + "github.com/spiral/roadrunner/service" "net" "strings" "syscall" - "github.com/spiral/roadrunner/service" ) // Config defines RPC service config. diff --git a/service/static/config.go b/service/static/config.go index 6a2d1206..95fdbeee 100644 --- a/service/static/config.go +++ b/service/static/config.go @@ -2,10 +2,10 @@ package static import ( "github.com/pkg/errors" + "github.com/spiral/roadrunner/service" "os" "path" "strings" - "github.com/spiral/roadrunner/service" ) // Config describes file location and controls access to them. -- cgit v1.2.3 From 5f36cab25482341458d8013bfe95900fdf62456d Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:09:18 -0700 Subject: mod updated --- go.mod | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/go.mod b/go.mod index c77556f3..28a6fac0 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,16 @@ module github.com/spiral/roadrunner require ( + github.com/BurntSushi/toml v0.3.0 + github.com/StackExchange/wmi v0.0.0-20180412205111-cdffdb33acae github.com/buger/goterm v0.0.0-20180423150900-6d19e6a8df12 + github.com/davecgh/go-spew v1.1.0 github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e github.com/fsnotify/fsnotify v1.4.7 + github.com/go-ole/go-ole v1.2.1 + github.com/golang/protobuf v1.1.0 github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce + github.com/inconshreveable/mousetrap v1.0.0 github.com/magiconair/properties v1.8.0 github.com/mattn/go-colorable v0.0.9 github.com/mattn/go-isatty v0.0.3 @@ -12,9 +18,13 @@ require ( github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b github.com/mitchellh/mapstructure v0.0.0-20180511142126-bb74f1db0675 github.com/olekukonko/tablewriter v0.0.0-20180506121414-d4647c9c7a84 + github.com/onsi/ginkgo v1.5.0 + github.com/onsi/gomega v1.4.0 github.com/pelletier/go-toml v1.2.0 github.com/pkg/errors v0.8.0 + github.com/pmezard/go-difflib v1.0.0 github.com/shirou/gopsutil v0.0.0-20180613084040-c23bcca55e77 + github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 github.com/sirupsen/logrus v1.0.5 github.com/spf13/afero v1.1.1 github.com/spf13/cast v1.2.0 @@ -23,8 +33,13 @@ require ( github.com/spf13/pflag v1.0.1 github.com/spf13/viper v1.0.2 github.com/spiral/goridge v0.0.0-20180607130832-0351012be508 + github.com/stretchr/testify v1.2.2 golang.org/x/crypto v0.0.0-20180614221331-a8fb68e7206f + golang.org/x/net v0.0.0-20180709032641-4d581e05a3ac + golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f golang.org/x/sys v0.0.0-20180615093615-8014b7b116a6 golang.org/x/text v0.3.0 + gopkg.in/airbrake/gobrake.v2 v2.0.9 + gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 gopkg.in/yaml.v2 v2.2.1 ) -- cgit v1.2.3 From ca92fcaf9b2290b3bf2124eed6b0db6860010712 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:12:01 -0700 Subject: mod updated --- cmd/rr/.rr.yaml | 56 ---------------- cmd/rr/LICENSE | 21 ------ cmd/rr/cmd/root.go | 128 ------------------------------------ cmd/rr/cmd/serve.go | 49 -------------- cmd/rr/cmd/version.go | 10 --- cmd/rr/debug/debugger.go | 117 --------------------------------- cmd/rr/http/reset.go | 61 ------------------ cmd/rr/http/workers.go | 165 ----------------------------------------------- cmd/rr/main.go | 67 ------------------- cmd/rr/utils/cprint.go | 28 -------- 10 files changed, 702 deletions(-) delete mode 100644 cmd/rr/.rr.yaml delete mode 100644 cmd/rr/LICENSE delete mode 100644 cmd/rr/cmd/root.go delete mode 100644 cmd/rr/cmd/serve.go delete mode 100644 cmd/rr/cmd/version.go delete mode 100644 cmd/rr/debug/debugger.go delete mode 100644 cmd/rr/http/reset.go delete mode 100644 cmd/rr/http/workers.go delete mode 100644 cmd/rr/main.go delete mode 100644 cmd/rr/utils/cprint.go diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml deleted file mode 100644 index 775cd6c3..00000000 --- a/cmd/rr/.rr.yaml +++ /dev/null @@ -1,56 +0,0 @@ -# rpc bus allows php application and external clients to talk to rr services. -rpc: - # enable rpc server - enable: true - - # rpc connection DSN. Supported TCP and Unix sockets. - listen: tcp://127.0.0.1:6001 - -# http service configuration. -http: - # set to false to disable http server. - enable: true - - # http host to listen. - address: 0.0.0.0:8080 - - # max POST request size, including file uploads in MB. - maxRequest: 200 - - # file upload configuration. - uploads: - # list of file extensions which are forbidden for uploading. - forbid: [".php", ".exe", ".bat"] - - # http worker pool configuration. - workers: - # php worker command. - command: "php psr-worker.php pipes" - - # connection method (pipes, tcp://:9000, unix://socket.unix). - relay: "pipes" - - # worker pool configuration. - pool: - # number of workers to be serving. - numWorkers: 4 - - # maximum jobs per worker, 0 - unlimited. - maxJobs: 0 - - # for how long worker is allowed to be bootstrapped. In nanoseconds :( - allocateTimeout: 600000000 - - # amount of time given to worker to gracefully destruct itself. In nanoseconds :( - destroyTimeout: 600000000 - -# static file serving. -static: - # serve http static files - enable: true - - # root directory for static file (http would not serve .php and .htaccess files). - dir: "public" - - # list of extensions for forbid for serving. - forbid: [".php", ".htaccess"] \ No newline at end of file diff --git a/cmd/rr/LICENSE b/cmd/rr/LICENSE deleted file mode 100644 index efb98c87..00000000 --- a/cmd/rr/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2018 SpiralScout - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go deleted file mode 100644 index 086f518c..00000000 --- a/cmd/rr/cmd/root.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright (c) 2018 SpiralScout -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package cmd - -import ( - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "github.com/spf13/viper" - "github.com/spiral/roadrunner/cmd/rr/utils" - "github.com/spiral/roadrunner/service" - "os" -) - -// Service bus for all the commands. -var ( - cfgFile string - verbose bool - - // Logger - shared logger. - Logger = logrus.New() - - // Container - shared service bus. - Container = service.NewContainer(Logger) - - // CLI is application endpoint. - CLI = &cobra.Command{ - Use: "rr", - SilenceErrors: true, - SilenceUsage: true, - Short: utils.Sprintf( - "RoadRunner, PHP Application Server:\nVersion: %s, %s", - Version, - BuildTime, - ), - } -) - -// ViperWrapper provides interface bridge between v configs and service.Config. -type ViperWrapper struct { - v *viper.Viper -} - -// get nested config section (sub-map), returns nil if section not found. -func (w *ViperWrapper) Get(key string) service.Config { - sub := w.v.Sub(key) - if sub == nil { - return nil - } - - return &ViperWrapper{sub} -} - -// Unmarshal unmarshal config data into given struct. -func (w *ViperWrapper) Unmarshal(out interface{}) error { - return w.v.Unmarshal(out) -} - -// Execute adds all child commands to the CLI command and sets flags appropriately. -// This is called by main.main(). It only needs to happen once to the CLI. -func Execute() { - if err := CLI.Execute(); err != nil { - utils.Printf("Error: %s\n", err) - os.Exit(1) - } -} - -func init() { - CLI.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "verbose output") - CLI.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is .rr.yaml)") - - cobra.OnInitialize(func() { - if verbose { - Logger.SetLevel(logrus.DebugLevel) - } - - if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil { - if err := Container.Init(cfg); err != nil { - utils.Printf("Error: %s\n", err) - os.Exit(1) - } - } - }) -} - -func initConfig(cfgFile string, path []string, name string) service.Config { - cfg := viper.New() - - if cfgFile != "" { - // Use cfg file from the flag. - cfg.SetConfigFile(cfgFile) - } else { - // automatic location - for _, p := range path { - cfg.AddConfigPath(p) - } - - cfg.SetConfigName(name) - } - - // read in environment variables that match - cfg.AutomaticEnv() - - // If a cfg file is found, read it in. - if err := cfg.ReadInConfig(); err != nil { - Logger.Warnf("config: %s", err) - return nil - } - - return &ViperWrapper{cfg} -} diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go deleted file mode 100644 index 8028395a..00000000 --- a/cmd/rr/cmd/serve.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright (c) 2018 SpiralScout -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package cmd - -import ( - "github.com/spf13/cobra" - "os" - "os/signal" - "syscall" -) - -var stopSignal = make(chan os.Signal, 1) - -func init() { - CLI.AddCommand(&cobra.Command{ - Use: "serve", - Short: "Serve RoadRunner service(s)", - Run: serveHandler, - }) - - signal.Notify(stopSignal, os.Interrupt, os.Kill, syscall.SIGTERM) -} - -func serveHandler(cmd *cobra.Command, args []string) { - go func() { - <-stopSignal - Container.Stop() - }() - - Container.Serve() -} diff --git a/cmd/rr/cmd/version.go b/cmd/rr/cmd/version.go deleted file mode 100644 index 8cbf7f69..00000000 --- a/cmd/rr/cmd/version.go +++ /dev/null @@ -1,10 +0,0 @@ -package cmd - -import "time" - -var ( - // Version - defines build version. - Version = "development" - // BuildTime - defined build time. - BuildTime = time.Now().Format(time.RFC1123) -) diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go deleted file mode 100644 index 8ec116c2..00000000 --- a/cmd/rr/debug/debugger.go +++ /dev/null @@ -1,117 +0,0 @@ -package debug - -import ( - "fmt" - "github.com/sirupsen/logrus" - "github.com/spiral/roadrunner" - "github.com/spiral/roadrunner/cmd/rr/utils" - rrhttp "github.com/spiral/roadrunner/service/http" - "net/http" - "strings" -) - -// Listener creates new debug listener. -func Listener(logger *logrus.Logger) func(event int, ctx interface{}) { - return (&debugger{logger}).listener -} - -// listener provide debug callback for system events. With colors! -type debugger struct{ logger *logrus.Logger } - -// listener listens to http events and generates nice looking output. -func (s *debugger) listener(event int, ctx interface{}) { - // http events - switch event { - case rrhttp.EventResponse: - e := ctx.(*rrhttp.ResponseEvent) - s.logger.Info(utils.Sprintf( - "%s %s %s %s", - e.Request.RemoteAddr, - statusColor(e.Response.Status), - e.Request.Method, - e.Request.URI, - )) - case rrhttp.EventError: - e := ctx.(*rrhttp.ErrorEvent) - - if _, ok := e.Error.(roadrunner.JobError); ok { - s.logger.Info(utils.Sprintf( - "%s %s %s", - statusColor(500), - e.Request.Method, - uri(e.Request), - )) - } else { - s.logger.Info(utils.Sprintf( - "%s %s %s %s", - statusColor(500), - e.Request.Method, - uri(e.Request), - e.Error, - )) - } - } - - switch event { - case roadrunner.EventWorkerKill: - w := ctx.(*roadrunner.Worker) - s.logger.Warning(utils.Sprintf( - "worker.%v killed", - *w.Pid, - )) - case roadrunner.EventWorkerError: - err := ctx.(roadrunner.WorkerError) - s.logger.Error(utils.Sprintf( - "worker.%v %s", - *err.Worker.Pid, - err.Caused, - )) - } - - // outputs - switch event { - case roadrunner.EventStderrOutput: - s.logger.Warning(utils.Sprintf("%s", strings.Trim(string(ctx.([]byte)), "\r\n"))) - } - - // rr server events - switch event { - case roadrunner.EventServerFailure: - s.logger.Error(utils.Sprintf("server is dead")) - } - - // pool events - switch event { - case roadrunner.EventPoolConstruct: - s.logger.Debug(utils.Sprintf("new worker pool")) - case roadrunner.EventPoolError: - s.logger.Error(utils.Sprintf("%s", ctx)) - } - - //s.logger.Warning(event, ctx) -} - -func statusColor(status int) string { - if status < 300 { - return utils.Sprintf("%v", status) - } - - if status < 400 { - return utils.Sprintf("%v", status) - } - - if status < 500 { - return utils.Sprintf("%v", status) - } - - return utils.Sprintf("%v", status) -} - -// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). -func uri(r *http.Request) string { - if r.TLS != nil { - return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) - } - - return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) -} diff --git a/cmd/rr/http/reset.go b/cmd/rr/http/reset.go deleted file mode 100644 index 3bc089ec..00000000 --- a/cmd/rr/http/reset.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright (c) 2018 SpiralScout -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package http - -import ( - "errors" - "github.com/spf13/cobra" - rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/spiral/roadrunner/cmd/rr/utils" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/rpc" -) - -func init() { - rr.CLI.AddCommand(&cobra.Command{ - Use: "http:reset", - Short: "Reload RoadRunner worker pools for the HTTP service", - RunE: reloadHandler, - }) -} - -func reloadHandler(cmd *cobra.Command, args []string) error { - svc, st := rr.Container.Get(rpc.ID) - if st < service.StatusOK { - return errors.New("RPC service is not configured") - } - - client, err := svc.(*rpc.Service).Client() - if err != nil { - return err - } - defer client.Close() - - utils.Printf("restarting http worker pool: ") - - var r string - if err := client.Call("http.Reset", true, &r); err != nil { - return err - } - - utils.Printf("done\n") - return nil -} diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go deleted file mode 100644 index b03c273f..00000000 --- a/cmd/rr/http/workers.go +++ /dev/null @@ -1,165 +0,0 @@ -// Copyright (c) 2018 SpiralScout -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package http - -import ( - "errors" - tm "github.com/buger/goterm" - "github.com/dustin/go-humanize" - "github.com/olekukonko/tablewriter" - "github.com/shirou/gopsutil/process" - "github.com/spf13/cobra" - rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/spiral/roadrunner/cmd/rr/utils" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/http" - rrpc "github.com/spiral/roadrunner/service/rpc" - "net/rpc" - "os" - "os/signal" - "strconv" - "syscall" - "time" -) - -var ( - interactive bool - stopSignal = make(chan os.Signal, 1) -) - -func init() { - workersCommand := &cobra.Command{ - Use: "http:workers", - Short: "List workers associated with RoadRunner HTTP service", - RunE: workersHandler, - } - - workersCommand.Flags().BoolVarP( - &interactive, - "interactive", - "i", - false, - "render interactive workers table", - ) - - rr.CLI.AddCommand(workersCommand) - - signal.Notify(stopSignal, syscall.SIGTERM) - signal.Notify(stopSignal, syscall.SIGINT) -} - -func workersHandler(cmd *cobra.Command, args []string) (err error) { - defer func() { - if r, ok := recover().(error); ok { - err = r - } - }() - - svc, st := rr.Container.Get(rrpc.ID) - if st < service.StatusOK { - return errors.New("RPC service is not configured") - } - - client, err := svc.(*rrpc.Service).Client() - if err != nil { - return err - } - defer client.Close() - - if !interactive { - showWorkers(client) - return nil - } - - tm.Clear() - for { - select { - case <-stopSignal: - return nil - case <-time.NewTicker(time.Millisecond * 500).C: - tm.MoveCursor(1, 1) - showWorkers(client) - tm.Flush() - } - } -} - -func showWorkers(client *rpc.Client) { - var r http.WorkerList - if err := client.Call("http.Workers", true, &r); err != nil { - panic(err) - } - - tw := tablewriter.NewWriter(os.Stdout) - tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "Created"}) - tw.SetColMinWidth(0, 7) - tw.SetColMinWidth(1, 9) - tw.SetColMinWidth(2, 7) - tw.SetColMinWidth(3, 7) - tw.SetColMinWidth(4, 18) - - for _, w := range r.Workers { - tw.Append([]string{ - strconv.Itoa(w.Pid), - renderStatus(w.Status), - renderJobs(w.NumJobs), - renderMemory(w.Pid), - renderAlive(time.Unix(0, w.Created)), - }) - } - - tw.Render() -} - -func renderStatus(status string) string { - switch status { - case "inactive": - return utils.Sprintf("inactive") - case "ready": - return utils.Sprintf("ready") - case "working": - return utils.Sprintf("working") - case "stopped": - return utils.Sprintf("stopped") - case "errored": - return utils.Sprintf("errored") - } - - return status -} - -func renderJobs(number int64) string { - return humanize.Comma(int64(number)) -} - -func renderAlive(t time.Time) string { - return humanize.RelTime(t, time.Now(), "ago", "") -} - -func renderMemory(pid int) string { - p, _ := process.NewProcess(int32(pid)) - i, err := p.MemoryInfo() - if err != nil { - return err.Error() - } - - return humanize.Bytes(i.RSS) -} diff --git a/cmd/rr/main.go b/cmd/rr/main.go deleted file mode 100644 index 03bef9bd..00000000 --- a/cmd/rr/main.go +++ /dev/null @@ -1,67 +0,0 @@ -// MIT License -// -// Copyright (c) 2018 SpiralScout -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package main - -import ( - rr "github.com/spiral/roadrunner/cmd/rr/cmd" - - // services (plugins) - "github.com/spiral/roadrunner/service/http" - "github.com/spiral/roadrunner/service/rpc" - "github.com/spiral/roadrunner/service/static" - - // cli plugins - "github.com/spiral/roadrunner/cmd/rr/debug" - _ "github.com/spiral/roadrunner/cmd/rr/http" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" -) - -var debugMode bool - -func main() { - // forcing text based logging - rr.Logger.Formatter = &logrus.TextFormatter{ForceColors: true} - - // provides ability to make local connection to services - rr.Container.Register(rpc.ID, &rpc.Service{}) - - // http serving - rr.Container.Register(http.ID, &http.Service{}) - - // serving static files - rr.Container.Register(static.ID, &static.Service{}) - - // debug mode - rr.CLI.PersistentFlags().BoolVarP(&debugMode, "debug", "d", false, "debug mode") - cobra.OnInitialize(func() { - if debugMode { - service, _ := rr.Container.Get(http.ID) - service.(*http.Service).AddListener(debug.Listener(rr.Logger)) - } - }) - - // you can register additional commands using cmd.CLI - rr.Execute() -} diff --git a/cmd/rr/utils/cprint.go b/cmd/rr/utils/cprint.go deleted file mode 100644 index 020975ec..00000000 --- a/cmd/rr/utils/cprint.go +++ /dev/null @@ -1,28 +0,0 @@ -package utils - -import ( - "fmt" - "github.com/mgutz/ansi" - "regexp" - "strings" -) - -var reg *regexp.Regexp - -func init() { - reg, _ = regexp.Compile(`<([^>]+)>`) -} - -// Printf works identically to fmt.Print but adds `color formatting support for CLI`. -func Printf(format string, args ...interface{}) { - fmt.Print(Sprintf(format, args...)) -} - -// Sprintf works identically to fmt.Sprintf but adds `color formatting support for CLI`. -func Sprintf(format string, args ...interface{}) string { - format = reg.ReplaceAllStringFunc(format, func(s string) string { - return ansi.ColorCode(strings.Trim(s, "<>/")) - }) - - return fmt.Sprintf(format, args...) -} -- cgit v1.2.3 From c78c38d07ecc7a9fb9ac089c2aee18d8e51db601 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:12:37 -0700 Subject: mod updated --- cmd/rr/.rr.yaml | 56 ++++++++++++++++ cmd/rr/LICENSE | 21 ++++++ cmd/rr/cmd/root.go | 128 ++++++++++++++++++++++++++++++++++++ cmd/rr/cmd/serve.go | 49 ++++++++++++++ cmd/rr/cmd/version.go | 11 ++++ cmd/rr/debug/debugger.go | 117 +++++++++++++++++++++++++++++++++ cmd/rr/http/reset.go | 61 ++++++++++++++++++ cmd/rr/http/workers.go | 165 +++++++++++++++++++++++++++++++++++++++++++++++ cmd/rr/main.go | 67 +++++++++++++++++++ cmd/rr/utils/cprint.go | 28 ++++++++ 10 files changed, 703 insertions(+) create mode 100644 cmd/rr/.rr.yaml create mode 100644 cmd/rr/LICENSE create mode 100644 cmd/rr/cmd/root.go create mode 100644 cmd/rr/cmd/serve.go create mode 100644 cmd/rr/cmd/version.go create mode 100644 cmd/rr/debug/debugger.go create mode 100644 cmd/rr/http/reset.go create mode 100644 cmd/rr/http/workers.go create mode 100644 cmd/rr/main.go create mode 100644 cmd/rr/utils/cprint.go diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml new file mode 100644 index 00000000..5ea6b345 --- /dev/null +++ b/cmd/rr/.rr.yaml @@ -0,0 +1,56 @@ +# rpc bus allows php application and external clients to talk to rr services. +rpc: + # enable rpc server + enable: true + + # rpc connection DSN. Supported TCP and Unix sockets. + listen: tcp://127.0.0.1:6001 + +# http service configuration. +http: + # set to false to disable http server. + enable: true + + # http host to listen. + address: 0.0.0.0:8080 + + # max POST request size, including file uploads in MB. + maxRequest: 200 + + # file upload configuration. + uploads: + # list of file extensions which are forbidden for uploading. + forbid: [".php", ".exe", ".bat"] + + # http worker pool configuration. + workers: + # php worker command. + command: "php psr-worker.php pipes" + + # connection method (pipes, tcp://:9000, unix://socket.unix). + relay: "pipes" + + # worker pool configuration. + pool: + # number of workers to be serving. + numWorkers: 4 + + # maximum jobs per worker, 0 - unlimited. + maxJobs: 0 + + # for how long worker is allowed to be bootstrapped. + allocateTimeout: 60 + + # amount of time given to worker to gracefully destruct itself. + destroyTimeout: 60 + +# static file serving. +static: + # serve http static files + enable: true + + # root directory for static file (http would not serve .php and .htaccess files). + dir: "public" + + # list of extensions for forbid for serving. + forbid: [".php", ".htaccess"] \ No newline at end of file diff --git a/cmd/rr/LICENSE b/cmd/rr/LICENSE new file mode 100644 index 00000000..efb98c87 --- /dev/null +++ b/cmd/rr/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 SpiralScout + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go new file mode 100644 index 00000000..086f518c --- /dev/null +++ b/cmd/rr/cmd/root.go @@ -0,0 +1,128 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package cmd + +import ( + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/spiral/roadrunner/cmd/rr/utils" + "github.com/spiral/roadrunner/service" + "os" +) + +// Service bus for all the commands. +var ( + cfgFile string + verbose bool + + // Logger - shared logger. + Logger = logrus.New() + + // Container - shared service bus. + Container = service.NewContainer(Logger) + + // CLI is application endpoint. + CLI = &cobra.Command{ + Use: "rr", + SilenceErrors: true, + SilenceUsage: true, + Short: utils.Sprintf( + "RoadRunner, PHP Application Server:\nVersion: %s, %s", + Version, + BuildTime, + ), + } +) + +// ViperWrapper provides interface bridge between v configs and service.Config. +type ViperWrapper struct { + v *viper.Viper +} + +// get nested config section (sub-map), returns nil if section not found. +func (w *ViperWrapper) Get(key string) service.Config { + sub := w.v.Sub(key) + if sub == nil { + return nil + } + + return &ViperWrapper{sub} +} + +// Unmarshal unmarshal config data into given struct. +func (w *ViperWrapper) Unmarshal(out interface{}) error { + return w.v.Unmarshal(out) +} + +// Execute adds all child commands to the CLI command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the CLI. +func Execute() { + if err := CLI.Execute(); err != nil { + utils.Printf("Error: %s\n", err) + os.Exit(1) + } +} + +func init() { + CLI.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "verbose output") + CLI.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is .rr.yaml)") + + cobra.OnInitialize(func() { + if verbose { + Logger.SetLevel(logrus.DebugLevel) + } + + if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil { + if err := Container.Init(cfg); err != nil { + utils.Printf("Error: %s\n", err) + os.Exit(1) + } + } + }) +} + +func initConfig(cfgFile string, path []string, name string) service.Config { + cfg := viper.New() + + if cfgFile != "" { + // Use cfg file from the flag. + cfg.SetConfigFile(cfgFile) + } else { + // automatic location + for _, p := range path { + cfg.AddConfigPath(p) + } + + cfg.SetConfigName(name) + } + + // read in environment variables that match + cfg.AutomaticEnv() + + // If a cfg file is found, read it in. + if err := cfg.ReadInConfig(); err != nil { + Logger.Warnf("config: %s", err) + return nil + } + + return &ViperWrapper{cfg} +} diff --git a/cmd/rr/cmd/serve.go b/cmd/rr/cmd/serve.go new file mode 100644 index 00000000..8028395a --- /dev/null +++ b/cmd/rr/cmd/serve.go @@ -0,0 +1,49 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package cmd + +import ( + "github.com/spf13/cobra" + "os" + "os/signal" + "syscall" +) + +var stopSignal = make(chan os.Signal, 1) + +func init() { + CLI.AddCommand(&cobra.Command{ + Use: "serve", + Short: "Serve RoadRunner service(s)", + Run: serveHandler, + }) + + signal.Notify(stopSignal, os.Interrupt, os.Kill, syscall.SIGTERM) +} + +func serveHandler(cmd *cobra.Command, args []string) { + go func() { + <-stopSignal + Container.Stop() + }() + + Container.Serve() +} diff --git a/cmd/rr/cmd/version.go b/cmd/rr/cmd/version.go new file mode 100644 index 00000000..26744922 --- /dev/null +++ b/cmd/rr/cmd/version.go @@ -0,0 +1,11 @@ +package cmd + +import "time" + +var ( + // Version - defines build version. + Version = "local" + + // BuildTime - defined build time. + BuildTime = time.Now().Format(time.RFC1123) +) diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go new file mode 100644 index 00000000..8ec116c2 --- /dev/null +++ b/cmd/rr/debug/debugger.go @@ -0,0 +1,117 @@ +package debug + +import ( + "fmt" + "github.com/sirupsen/logrus" + "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/cmd/rr/utils" + rrhttp "github.com/spiral/roadrunner/service/http" + "net/http" + "strings" +) + +// Listener creates new debug listener. +func Listener(logger *logrus.Logger) func(event int, ctx interface{}) { + return (&debugger{logger}).listener +} + +// listener provide debug callback for system events. With colors! +type debugger struct{ logger *logrus.Logger } + +// listener listens to http events and generates nice looking output. +func (s *debugger) listener(event int, ctx interface{}) { + // http events + switch event { + case rrhttp.EventResponse: + e := ctx.(*rrhttp.ResponseEvent) + s.logger.Info(utils.Sprintf( + "%s %s %s %s", + e.Request.RemoteAddr, + statusColor(e.Response.Status), + e.Request.Method, + e.Request.URI, + )) + case rrhttp.EventError: + e := ctx.(*rrhttp.ErrorEvent) + + if _, ok := e.Error.(roadrunner.JobError); ok { + s.logger.Info(utils.Sprintf( + "%s %s %s", + statusColor(500), + e.Request.Method, + uri(e.Request), + )) + } else { + s.logger.Info(utils.Sprintf( + "%s %s %s %s", + statusColor(500), + e.Request.Method, + uri(e.Request), + e.Error, + )) + } + } + + switch event { + case roadrunner.EventWorkerKill: + w := ctx.(*roadrunner.Worker) + s.logger.Warning(utils.Sprintf( + "worker.%v killed", + *w.Pid, + )) + case roadrunner.EventWorkerError: + err := ctx.(roadrunner.WorkerError) + s.logger.Error(utils.Sprintf( + "worker.%v %s", + *err.Worker.Pid, + err.Caused, + )) + } + + // outputs + switch event { + case roadrunner.EventStderrOutput: + s.logger.Warning(utils.Sprintf("%s", strings.Trim(string(ctx.([]byte)), "\r\n"))) + } + + // rr server events + switch event { + case roadrunner.EventServerFailure: + s.logger.Error(utils.Sprintf("server is dead")) + } + + // pool events + switch event { + case roadrunner.EventPoolConstruct: + s.logger.Debug(utils.Sprintf("new worker pool")) + case roadrunner.EventPoolError: + s.logger.Error(utils.Sprintf("%s", ctx)) + } + + //s.logger.Warning(event, ctx) +} + +func statusColor(status int) string { + if status < 300 { + return utils.Sprintf("%v", status) + } + + if status < 400 { + return utils.Sprintf("%v", status) + } + + if status < 500 { + return utils.Sprintf("%v", status) + } + + return utils.Sprintf("%v", status) +} + +// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). +func uri(r *http.Request) string { + if r.TLS != nil { + return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) + } + + return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) +} diff --git a/cmd/rr/http/reset.go b/cmd/rr/http/reset.go new file mode 100644 index 00000000..3bc089ec --- /dev/null +++ b/cmd/rr/http/reset.go @@ -0,0 +1,61 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package http + +import ( + "errors" + "github.com/spf13/cobra" + rr "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/cmd/rr/utils" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/rpc" +) + +func init() { + rr.CLI.AddCommand(&cobra.Command{ + Use: "http:reset", + Short: "Reload RoadRunner worker pools for the HTTP service", + RunE: reloadHandler, + }) +} + +func reloadHandler(cmd *cobra.Command, args []string) error { + svc, st := rr.Container.Get(rpc.ID) + if st < service.StatusOK { + return errors.New("RPC service is not configured") + } + + client, err := svc.(*rpc.Service).Client() + if err != nil { + return err + } + defer client.Close() + + utils.Printf("restarting http worker pool: ") + + var r string + if err := client.Call("http.Reset", true, &r); err != nil { + return err + } + + utils.Printf("done\n") + return nil +} diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go new file mode 100644 index 00000000..b03c273f --- /dev/null +++ b/cmd/rr/http/workers.go @@ -0,0 +1,165 @@ +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package http + +import ( + "errors" + tm "github.com/buger/goterm" + "github.com/dustin/go-humanize" + "github.com/olekukonko/tablewriter" + "github.com/shirou/gopsutil/process" + "github.com/spf13/cobra" + rr "github.com/spiral/roadrunner/cmd/rr/cmd" + "github.com/spiral/roadrunner/cmd/rr/utils" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/http" + rrpc "github.com/spiral/roadrunner/service/rpc" + "net/rpc" + "os" + "os/signal" + "strconv" + "syscall" + "time" +) + +var ( + interactive bool + stopSignal = make(chan os.Signal, 1) +) + +func init() { + workersCommand := &cobra.Command{ + Use: "http:workers", + Short: "List workers associated with RoadRunner HTTP service", + RunE: workersHandler, + } + + workersCommand.Flags().BoolVarP( + &interactive, + "interactive", + "i", + false, + "render interactive workers table", + ) + + rr.CLI.AddCommand(workersCommand) + + signal.Notify(stopSignal, syscall.SIGTERM) + signal.Notify(stopSignal, syscall.SIGINT) +} + +func workersHandler(cmd *cobra.Command, args []string) (err error) { + defer func() { + if r, ok := recover().(error); ok { + err = r + } + }() + + svc, st := rr.Container.Get(rrpc.ID) + if st < service.StatusOK { + return errors.New("RPC service is not configured") + } + + client, err := svc.(*rrpc.Service).Client() + if err != nil { + return err + } + defer client.Close() + + if !interactive { + showWorkers(client) + return nil + } + + tm.Clear() + for { + select { + case <-stopSignal: + return nil + case <-time.NewTicker(time.Millisecond * 500).C: + tm.MoveCursor(1, 1) + showWorkers(client) + tm.Flush() + } + } +} + +func showWorkers(client *rpc.Client) { + var r http.WorkerList + if err := client.Call("http.Workers", true, &r); err != nil { + panic(err) + } + + tw := tablewriter.NewWriter(os.Stdout) + tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "Created"}) + tw.SetColMinWidth(0, 7) + tw.SetColMinWidth(1, 9) + tw.SetColMinWidth(2, 7) + tw.SetColMinWidth(3, 7) + tw.SetColMinWidth(4, 18) + + for _, w := range r.Workers { + tw.Append([]string{ + strconv.Itoa(w.Pid), + renderStatus(w.Status), + renderJobs(w.NumJobs), + renderMemory(w.Pid), + renderAlive(time.Unix(0, w.Created)), + }) + } + + tw.Render() +} + +func renderStatus(status string) string { + switch status { + case "inactive": + return utils.Sprintf("inactive") + case "ready": + return utils.Sprintf("ready") + case "working": + return utils.Sprintf("working") + case "stopped": + return utils.Sprintf("stopped") + case "errored": + return utils.Sprintf("errored") + } + + return status +} + +func renderJobs(number int64) string { + return humanize.Comma(int64(number)) +} + +func renderAlive(t time.Time) string { + return humanize.RelTime(t, time.Now(), "ago", "") +} + +func renderMemory(pid int) string { + p, _ := process.NewProcess(int32(pid)) + i, err := p.MemoryInfo() + if err != nil { + return err.Error() + } + + return humanize.Bytes(i.RSS) +} diff --git a/cmd/rr/main.go b/cmd/rr/main.go new file mode 100644 index 00000000..03bef9bd --- /dev/null +++ b/cmd/rr/main.go @@ -0,0 +1,67 @@ +// MIT License +// +// Copyright (c) 2018 SpiralScout +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package main + +import ( + rr "github.com/spiral/roadrunner/cmd/rr/cmd" + + // services (plugins) + "github.com/spiral/roadrunner/service/http" + "github.com/spiral/roadrunner/service/rpc" + "github.com/spiral/roadrunner/service/static" + + // cli plugins + "github.com/spiral/roadrunner/cmd/rr/debug" + _ "github.com/spiral/roadrunner/cmd/rr/http" + + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var debugMode bool + +func main() { + // forcing text based logging + rr.Logger.Formatter = &logrus.TextFormatter{ForceColors: true} + + // provides ability to make local connection to services + rr.Container.Register(rpc.ID, &rpc.Service{}) + + // http serving + rr.Container.Register(http.ID, &http.Service{}) + + // serving static files + rr.Container.Register(static.ID, &static.Service{}) + + // debug mode + rr.CLI.PersistentFlags().BoolVarP(&debugMode, "debug", "d", false, "debug mode") + cobra.OnInitialize(func() { + if debugMode { + service, _ := rr.Container.Get(http.ID) + service.(*http.Service).AddListener(debug.Listener(rr.Logger)) + } + }) + + // you can register additional commands using cmd.CLI + rr.Execute() +} diff --git a/cmd/rr/utils/cprint.go b/cmd/rr/utils/cprint.go new file mode 100644 index 00000000..020975ec --- /dev/null +++ b/cmd/rr/utils/cprint.go @@ -0,0 +1,28 @@ +package utils + +import ( + "fmt" + "github.com/mgutz/ansi" + "regexp" + "strings" +) + +var reg *regexp.Regexp + +func init() { + reg, _ = regexp.Compile(`<([^>]+)>`) +} + +// Printf works identically to fmt.Print but adds `color formatting support for CLI`. +func Printf(format string, args ...interface{}) { + fmt.Print(Sprintf(format, args...)) +} + +// Sprintf works identically to fmt.Sprintf but adds `color formatting support for CLI`. +func Sprintf(format string, args ...interface{}) string { + format = reg.ReplaceAllStringFunc(format, func(s string) string { + return ansi.ColorCode(strings.Trim(s, "<>/")) + }) + + return fmt.Sprintf(format, args...) +} -- cgit v1.2.3 From f688229fe93016064ad67b381e774881d991b8ab Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:20:49 -0700 Subject: mod updated --- cmd/rr/cmd/root.go | 2 +- service/http/config.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go index 086f518c..1a21cfc9 100644 --- a/cmd/rr/cmd/root.go +++ b/cmd/rr/cmd/root.go @@ -58,7 +58,7 @@ type ViperWrapper struct { v *viper.Viper } -// get nested config section (sub-map), returns nil if section not found. +// Get nested config section (sub-map), returns nil if section not found. func (w *ViperWrapper) Get(key string) service.Config { sub := w.v.Sub(key) if sub == nil { diff --git a/service/http/config.go b/service/http/config.go index ba303aae..d50f59f0 100644 --- a/service/http/config.go +++ b/service/http/config.go @@ -32,6 +32,10 @@ func (c *Config) Hydrate(cfg service.Config) error { return err } + if c.Workers.Relay == "" { + c.Workers.Relay = "pipes" + } + if err := c.Valid(); err != nil { return err } -- cgit v1.2.3 From 2f2b3137d243a75a98cf2a0a3ab32e7fb407e48d Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:35:11 -0700 Subject: minor CS --- service/container.go | 140 +++++++++++++++++++++++++++++++++++++++------ service/container_test.go | 16 ++++++ service/http/config.go | 8 +-- service/injector.go | 112 ------------------------------------ service/injector_test.go | 24 -------- service/rpc/config_test.go | 2 - 6 files changed, 143 insertions(+), 159 deletions(-) delete mode 100644 service/injector.go delete mode 100644 service/injector_test.go diff --git a/service/container.go b/service/container.go index 02ebbf26..7eff4551 100644 --- a/service/container.go +++ b/service/container.go @@ -5,8 +5,27 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "sync" + "reflect" ) +var noConfig = fmt.Errorf("no config has been provided") + +// InitMethod contains name of the method to be automatically invoked while service initialization. Must return +// (bool, error). Container can be requested as well. Config can be requested in a form +// of service.Config or pointer to service specific config struct (automatically unmarshalled), config argument must +// implement service.HydrateConfig. +const InitMethod = "Init" + +// Service can serve. Service can provide Init method which must return (bool, error) signature and might accept +// other services and/or configs as dependency. +type Service interface { + // Serve serves. + Serve() error + + // Stop stops the service. + Stop() +} + // Container controls all internal RR services and provides plugin based system. type Container interface { // Register add new service to the container under given name. @@ -29,18 +48,6 @@ type Container interface { Stop() } -// Service can serve. Service can provide Init method which must return (bool, error) signature and might accept -// other services and/or configs as dependency. Container can be requested as well. Config can be requested in a form -// of service.Config or pointer to service specific config struct (automatically unmarshalled), config argument must -// implement service.HydrateConfig. -type Service interface { - // Serve serves. - Serve() error - - // Stop stops the service. - Stop() -} - // Config provides ability to slice configuration sections and unmarshal configuration data into // given structure. type Config interface { @@ -122,12 +129,11 @@ func (c *container) Init(cfg Config) error { return fmt.Errorf("service [%s] has already been configured", e.name) } - // inject service dependencies (todo: move to container) - if ok, err := initService(e.svc, cfg.Get(e.name), c); err != nil { + // inject service dependencies + if ok, err := c.initService(e.svc, cfg.Get(e.name)); err != nil { + // soft error (skipping) if err == noConfig { c.log.Warningf("[%s]: no config has been provided", e.name) - - // unable to meet dependency requirements, skippingF continue } @@ -143,7 +149,6 @@ func (c *container) Init(cfg Config) error { return nil } -//todo: refactor ???? // Serve all configured services. Non blocking. func (c *container) Serve() error { var ( @@ -193,6 +198,7 @@ func (c *container) Serve() error { // Stop sends stop command to all running services. func (c *container) Stop() { c.log.Debugf("received stop command") + for _, e := range c.services { if e.hasStatus(StatusServing) { e.svc.(Service).Stop() @@ -202,3 +208,103 @@ func (c *container) Stop() { } } } + +// calls Init method with automatically resolved arguments. +func (c *container) initService(s interface{}, segment Config) (bool, error) { + r := reflect.TypeOf(s) + + m, ok := r.MethodByName("Init") + if !ok { + // no Init method is presented, assuming service does not need initialization. + return false, nil + } + + if err := c.verifySignature(m); err != nil { + return false, err + } + + // hydrating + values, err := c.resolveValues(s, m, segment) + if err != nil { + return false, err + } + + // initiating service + out := m.Func.Call(values) + + if out[1].IsNil() { + return out[0].Bool(), nil + } + + return out[0].Bool(), out[1].Interface().(error) +} + +// resolveValues returns slice of call arguments for service Init method. +func (c *container) resolveValues(s interface{}, m reflect.Method, cfg Config) (values []reflect.Value, err error) { + for i := 0; i < m.Type.NumIn(); i++ { + v := m.Type.In(i) + + switch { + case v.ConvertibleTo(reflect.ValueOf(s).Type()): // service itself + values = append(values, reflect.ValueOf(s)) + + case v.Implements(reflect.TypeOf((*Container)(nil)).Elem()): // container + values = append(values, reflect.ValueOf(c)) + + case v.Implements(reflect.TypeOf((*HydrateConfig)(nil)).Elem()): // injectable config + if cfg == nil { + return nil, noConfig + } + + sc := reflect.New(v.Elem()) + if err := sc.Interface().(HydrateConfig).Hydrate(cfg); err != nil { + return nil, err + } + + values = append(values, sc) + + case v.Implements(reflect.TypeOf((*Config)(nil)).Elem()): // generic config section + if cfg == nil { + return nil, noConfig + } + + values = append(values, reflect.ValueOf(cfg)) + + default: // dependency on other service (resolution to nil if service can't be found) + found := false + for _, e := range c.services { + if !e.hasStatus(StatusOK) || !v.ConvertibleTo(reflect.ValueOf(e.svc).Type()) { + continue + } + + found = true + values = append(values, reflect.ValueOf(e.svc)) + break + } + + if !found { + // placeholder (make sure to check inside the method) + values = append(values, reflect.New(v).Elem()) + } + } + } + + return +} + +// verifySignature checks if Init method has valid signature +func (c *container) verifySignature(m reflect.Method) error { + if m.Type.NumOut() != 2 { + return fmt.Errorf("method Init must have exact 2 return values") + } + + if m.Type.Out(0).Kind() != reflect.Bool { + return fmt.Errorf("first return value of Init method must be bool type") + } + + if !m.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { + return fmt.Errorf("second return value of Init method value must be error type") + } + + return nil +} diff --git a/service/container_test.go b/service/container_test.go index bf95cbd4..d2ca1f03 100644 --- a/service/container_test.go +++ b/service/container_test.go @@ -325,3 +325,19 @@ func TestContainer_ServeErrorMultiple(t *testing.T) { assert.IsType(t, &testService{}, s) assert.Equal(t, StatusStopped, st) } + +//func TestContainer_Init(t *testing.T) { +// logger, hook := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// svc := &testService{ok: true} +// +// c := NewContainer(logger) +// c.Register("test", svc) +// c.Register("test2", struct{}{}) +// +// assert.Equal(t, 2, len(hook.Entries)) +// +// assert.NoError(t, c.Serve()) +// c.Stop() +//} diff --git a/service/http/config.go b/service/http/config.go index d50f59f0..20a247fb 100644 --- a/service/http/config.go +++ b/service/http/config.go @@ -32,14 +32,14 @@ func (c *Config) Hydrate(cfg service.Config) error { return err } - if c.Workers.Relay == "" { - c.Workers.Relay = "pipes" - } - if err := c.Valid(); err != nil { return err } + if c.Workers.Relay == "" { + c.Workers.Relay = "pipes" + } + if c.Workers.RelayTimeout < time.Microsecond { c.Workers.RelayTimeout = time.Second * time.Duration(c.Workers.RelayTimeout.Nanoseconds()) } diff --git a/service/injector.go b/service/injector.go deleted file mode 100644 index 3c41240a..00000000 --- a/service/injector.go +++ /dev/null @@ -1,112 +0,0 @@ -package service - -import ( - "fmt" - "reflect" -) - -const initMethod = "Init" - -var noConfig = fmt.Errorf("no config has been provided") - -// calls Init method with automatically resolved arguments. -func initService(s interface{}, cfg Config, c *container) (bool, error) { - r := reflect.TypeOf(s) - - m, ok := r.MethodByName(initMethod) - if !ok { - // no Init method is presented, assuming service does not need - // initialization. - return false, nil - } - - if err := verifySignature(m); err != nil { - return false, err - } - - // hydrating - values, err := injectValues(m, s, cfg, c) - if err != nil { - return false, err - } - - // initiating service - out := m.Func.Call(values) - - if out[1].IsNil() { - return out[0].Bool(), nil - } - - return out[0].Bool(), out[1].Interface().(error) -} - -// injectValues returns slice of call arguments for service Init method. -func injectValues(m reflect.Method, s interface{}, cfg Config, c *container) (values []reflect.Value, err error) { - for i := 0; i < m.Type.NumIn(); i++ { - v := m.Type.In(i) - - switch { - case v.ConvertibleTo(reflect.ValueOf(s).Type()): // service itself - values = append(values, reflect.ValueOf(s)) - - case v.Implements(reflect.TypeOf((*HydrateConfig)(nil)).Elem()): // automatically configured config - if cfg == nil { - // todo: generic value - return nil, noConfig - } - - sc := reflect.New(v.Elem()) - if err := sc.Interface().(HydrateConfig).Hydrate(cfg); err != nil { - return nil, err - } - - values = append(values, sc) - - case v.Implements(reflect.TypeOf((*Config)(nil)).Elem()): // config section - if cfg == nil { - // todo: generic value - return nil, noConfig - } - values = append(values, reflect.ValueOf(cfg)) - - case v.Implements(reflect.TypeOf((*Container)(nil)).Elem()): // container - values = append(values, reflect.ValueOf(c)) - - default: - found := false - - // looking for the service candidate - for _, e := range c.services { - if v.ConvertibleTo(reflect.ValueOf(e.svc).Type()) && e.hasStatus(StatusOK) { - found = true - values = append(values, reflect.ValueOf(e.svc)) - break - } - } - - if !found { - // placeholder (make sure to check inside the method) - values = append(values, reflect.New(v).Elem()) - } - } - } - - return -} - -// verifySignature checks if Init method has valid signature -func verifySignature(m reflect.Method) error { - if m.Type.NumOut() != 2 { - return fmt.Errorf("method Init must have exact 2 return values") - } - - if m.Type.Out(0).Kind() != reflect.Bool { - return fmt.Errorf("first return value of Init method must be bool type") - } - - if !m.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { - return fmt.Errorf("second return value of Init method value must be error type") - } - - return nil -} diff --git a/service/injector_test.go b/service/injector_test.go deleted file mode 100644 index facc3f74..00000000 --- a/service/injector_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package service - -import ( - "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestContainer_Init(t *testing.T) { - logger, hook := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - svc := &testService{ok: true} - - c := NewContainer(logger) - c.Register("test", svc) - c.Register("test2", struct{}{}) - - assert.Equal(t, 2, len(hook.Entries)) - - assert.NoError(t, c.Serve()) - c.Stop() -} diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go index 87a89a2b..8642e9ab 100644 --- a/service/rpc/config_test.go +++ b/service/rpc/config_test.go @@ -6,8 +6,6 @@ import ( "testing" ) -// todo: test hydrate - func TestConfig_Listener(t *testing.T) { cfg := &Config{Listen: "tcp://:18001"} -- cgit v1.2.3 From 436802f6ff04ec205aefb2545f33f8ccb36e6bf2 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:35:53 -0700 Subject: fixing README --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e69f90d4..911e60ff 100644 --- a/README.md +++ b/README.md @@ -95,11 +95,11 @@ http: # maximum jobs per worker, 0 - unlimited. maxJobs: 0 - # for how long pool should attempt to allocate free worker (request timeout). Nanoseconds atm. (60s) - allocateTimeout: 60000000000 + # for how long pool should attempt to allocate free worker (request timeout). + allocateTimeout: 60 - # amount of time given to worker to gracefully destruct itself. Nanoseconds atm. (30s) - destroyTimeout: 30000000000 + # amount of time given to worker to gracefully destruct itself. + destroyTimeout: 30 # static file serving. static: -- cgit v1.2.3 From 7494ed6d6a9ab94603d017a48d244f077aaa7575 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:38:36 -0700 Subject: minor cs --- cmd/rr/debug/debugger.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go index 8ec116c2..ed9a1a56 100644 --- a/cmd/rr/debug/debugger.go +++ b/cmd/rr/debug/debugger.go @@ -71,7 +71,10 @@ func (s *debugger) listener(event int, ctx interface{}) { // outputs switch event { case roadrunner.EventStderrOutput: - s.logger.Warning(utils.Sprintf("%s", strings.Trim(string(ctx.([]byte)), "\r\n"))) + s.logger.Warning(utils.Sprintf( + "%s", + strings.Trim(string(ctx.([]byte)), "\r\n"), + )) } // rr server events -- cgit v1.2.3 From 74a9664f7df21ef26d0718aa2f8239a0562566c6 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:44:14 -0700 Subject: more tests --- service/container_test.go | 58 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/service/container_test.go b/service/container_test.go index d2ca1f03..fbafe809 100644 --- a/service/container_test.go +++ b/service/container_test.go @@ -326,18 +326,46 @@ func TestContainer_ServeErrorMultiple(t *testing.T) { assert.Equal(t, StatusStopped, st) } -//func TestContainer_Init(t *testing.T) { -// logger, hook := test.NewNullLogger() -// logger.SetLevel(logrus.DebugLevel) -// -// svc := &testService{ok: true} -// -// c := NewContainer(logger) -// c.Register("test", svc) -// c.Register("test2", struct{}{}) -// -// assert.Equal(t, 2, len(hook.Entries)) -// -// assert.NoError(t, c.Serve()) -// c.Stop() -//} +type testInitA struct{} + +func (t *testInitA) Init() error { + return nil +} + +type testInitB struct{} + +func (t *testInitB) Init() (int, error) { + return 0, nil +} + +func TestContainer_InitErrorA(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testInitA{}) + + assert.Error(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`})) +} + +func TestContainer_InitErrorB(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testInitB{}) + + assert.Error(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`})) +} + +type testInitC struct{} + +func TestContainer_NoInit(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testInitC{}) + + assert.NoError(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`})) +} -- cgit v1.2.3 From c691f2b3e91c00d29201f06253b845f59e66959d Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:51:46 -0700 Subject: more tests --- service/container.go | 2 +- service/container_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/service/container.go b/service/container.go index 7eff4551..96f2d546 100644 --- a/service/container.go +++ b/service/container.go @@ -216,7 +216,7 @@ func (c *container) initService(s interface{}, segment Config) (bool, error) { m, ok := r.MethodByName("Init") if !ok { // no Init method is presented, assuming service does not need initialization. - return false, nil + return true, nil } if err := c.verifySignature(m); err != nil { diff --git a/service/container_test.go b/service/container_test.go index fbafe809..8cb97c74 100644 --- a/service/container_test.go +++ b/service/container_test.go @@ -66,7 +66,7 @@ func (t *testService) setChan(c chan interface{}) { type testCfg struct{ cfg string } func (cfg *testCfg) Get(name string) Config { - vars := make(map[string]string) + vars := make(map[string]interface{}) json.Unmarshal([]byte(cfg.cfg), &vars) v, ok := vars[name] @@ -74,7 +74,8 @@ func (cfg *testCfg) Get(name string) Config { return nil } - return &testCfg{cfg: v} + d, _ := json.Marshal(v) + return &testCfg{cfg: string(d)} } func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } @@ -369,3 +370,62 @@ func TestContainer_NoInit(t *testing.T) { assert.NoError(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`})) } + +type testInitD struct { + c *testInitC +} + +type DCfg struct { + V string +} + +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *DCfg) Hydrate(cfg Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if c.V == "fail" { + return errors.New("failed config") + } + + return nil +} + +func (t *testInitD) Init(r *testInitC, c Container, cfg *DCfg) (bool, error) { + if r == nil { + return false, errors.New("unable to find testInitC") + } + + if c == nil { + return false, errors.New("unable to find Container") + } + + if cfg.V != "ok" { + return false, errors.New("invalid config") + } + + return false, nil +} + +func TestContainer_InitDependency(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testInitC{}) + c.Register("test2", &testInitD{}) + + assert.NoError(t, c.Init(&testCfg{`{"test":"something", "test2":{"v":"ok"}}`})) +} + +func TestContainer_InitDependencyFail(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testInitC{}) + c.Register("test2", &testInitD{}) + + assert.Error(t, c.Init(&testCfg{`{"test":"something", "test2":{"v":"fail"}}`})) +} -- cgit v1.2.3 From 528eb2c1b9befaac82a85de9c6d7dea86da0aaac Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:57:15 -0700 Subject: more tests --- service/http/config_test.go | 21 +++++++++++++++++++++ service/rpc/config_test.go | 21 +++++++++++++++++++++ service/static/config_test.go | 21 +++++++++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/service/http/config_test.go b/service/http/config_test.go index cb804f4a..86622cdf 100644 --- a/service/http/config_test.go +++ b/service/http/config_test.go @@ -6,8 +6,29 @@ import ( "os" "testing" "time" + "github.com/spiral/roadrunner/service" + "encoding/json" ) +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error1(t *testing.T) { + cfg := &mockCfg{`{"enable": true}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"dir": "/dir/"`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + func Test_Config_Valid(t *testing.T) { cfg := &Config{ Enable: true, diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go index 8642e9ab..95eb41b4 100644 --- a/service/rpc/config_test.go +++ b/service/rpc/config_test.go @@ -4,8 +4,29 @@ import ( "github.com/stretchr/testify/assert" "runtime" "testing" + "encoding/json" + "github.com/spiral/roadrunner/service" ) +type testCfg struct{ cfg string } + +func (cfg *testCfg) Get(name string) service.Config { return nil } +func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate(t *testing.T) { + cfg := &testCfg{`{"enable": true, "listen": "tcp://:18001"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &testCfg{`{"enable": true, "listen": "invalid"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + func TestConfig_Listener(t *testing.T) { cfg := &Config{Listen: "tcp://:18001"} diff --git a/service/static/config_test.go b/service/static/config_test.go index d2099cdf..baa9776f 100644 --- a/service/static/config_test.go +++ b/service/static/config_test.go @@ -3,8 +3,29 @@ package static import ( "github.com/stretchr/testify/assert" "testing" + "github.com/spiral/roadrunner/service" + "encoding/json" ) +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate(t *testing.T) { + cfg := &mockCfg{`{"dir": "./"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{`{"dir": "/dir/"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + func TestConfig_Forbids(t *testing.T) { cfg := Config{Forbid: []string{".php"}} -- cgit v1.2.3 From 55098299c87761c0545ee7a98e0132fa48c5d79f Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 21:58:07 -0700 Subject: more tests --- CHANGELOG.md | 4 ++++ build.sh | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 57efd5a1..66555957 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ CHANGELOG ========= +v1.1.0 (80.07.2018) +------- +- + v1.0.5 (30.06.2018) ------- - docker compatible logging (forcing TTY output for logrus) diff --git a/build.sh b/build.sh index 6765a431..3ca8ef46 100755 --- a/build.sh +++ b/build.sh @@ -2,7 +2,7 @@ cd $(dirname "${BASH_SOURCE[0]}") OD="$(pwd)" # Pushes application version into the build information. -RR_VERSION=1.0.5 +RR_VERSION=1.1.0 # Hardcode some values to the core package LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}" -- cgit v1.2.3 From 21bd058003a159ff307565d5b57e3631921a7a96 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 8 Jul 2018 22:01:40 -0700 Subject: releasing and CS for v1.1.0 --- CHANGELOG.md | 9 ++++++++- service/container.go | 2 +- service/http/config_test.go | 4 ++-- service/rpc/config_test.go | 4 ++-- service/static/config_test.go | 4 ++-- 5 files changed, 15 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 66555957..e4d41d6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,14 @@ CHANGELOG v1.1.0 (80.07.2018) ------- -- +- bugfix: Wrong values for $_SERVER['REQUEST_TIME'] and $_SERVER['REQUEST_TIME_FLOAT'] +- rr now resolves remoteAddr (ip-address) +- improvements in error buffer +- support for custom configs and dependency injection for services +- support for net/http native middlewares +- better debugger +- config pre-processing now allows second values for http service timeouts +- support for non serving services v1.0.5 (30.06.2018) ------- diff --git a/service/container.go b/service/container.go index 96f2d546..436d2e5f 100644 --- a/service/container.go +++ b/service/container.go @@ -4,8 +4,8 @@ import ( "fmt" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "sync" "reflect" + "sync" ) var noConfig = fmt.Errorf("no config has been provided") diff --git a/service/http/config_test.go b/service/http/config_test.go index 86622cdf..2e3fe731 100644 --- a/service/http/config_test.go +++ b/service/http/config_test.go @@ -1,13 +1,13 @@ package http import ( + "encoding/json" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" "github.com/stretchr/testify/assert" "os" "testing" "time" - "github.com/spiral/roadrunner/service" - "encoding/json" ) type mockCfg struct{ cfg string } diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go index 95eb41b4..a7c51c0f 100644 --- a/service/rpc/config_test.go +++ b/service/rpc/config_test.go @@ -1,11 +1,11 @@ package rpc import ( + "encoding/json" + "github.com/spiral/roadrunner/service" "github.com/stretchr/testify/assert" "runtime" "testing" - "encoding/json" - "github.com/spiral/roadrunner/service" ) type testCfg struct{ cfg string } diff --git a/service/static/config_test.go b/service/static/config_test.go index baa9776f..18168d59 100644 --- a/service/static/config_test.go +++ b/service/static/config_test.go @@ -1,10 +1,10 @@ package static import ( + "encoding/json" + "github.com/spiral/roadrunner/service" "github.com/stretchr/testify/assert" "testing" - "github.com/spiral/roadrunner/service" - "encoding/json" ) type mockCfg struct{ cfg string } -- cgit v1.2.3