diff options
33 files changed, 793 insertions, 370 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 57efd5a1..e4d41d6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,17 @@ CHANGELOG ========= +v1.1.0 (80.07.2018) +------- +- bugfix: Wrong values for $_SERVER['REQUEST_TIME'] and $_SERVER['REQUEST_TIME_FLOAT'] +- rr now resolves remoteAddr (ip-address) +- improvements in error buffer +- support for custom configs and dependency injection for services +- support for net/http native middlewares +- better debugger +- config pre-processing now allows second values for http service timeouts +- support for non serving services + v1.0.5 (30.06.2018) ------- - docker compatible logging (forcing TTY output for logrus) @@ -95,11 +95,11 @@ http: # maximum jobs per worker, 0 - unlimited. maxJobs: 0 - # for how long pool should attempt to allocate free worker (request timeout). Nanoseconds atm. (60s) - allocateTimeout: 60000000000 + # for how long pool should attempt to allocate free worker (request timeout). + allocateTimeout: 60 - # amount of time given to worker to gracefully destruct itself. Nanoseconds atm. (30s) - destroyTimeout: 30000000000 + # amount of time given to worker to gracefully destruct itself. + destroyTimeout: 30 # static file serving. static: @@ -2,7 +2,7 @@ cd $(dirname "${BASH_SOURCE[0]}") OD="$(pwd)" # Pushes application version into the build information. -RR_VERSION=1.0.5 +RR_VERSION=1.1.0 # Hardcode some values to the core package LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}" diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml index 775cd6c3..5ea6b345 100644 --- a/cmd/rr/.rr.yaml +++ b/cmd/rr/.rr.yaml @@ -38,11 +38,11 @@ http: # maximum jobs per worker, 0 - unlimited. maxJobs: 0 - # for how long worker is allowed to be bootstrapped. In nanoseconds :( - allocateTimeout: 600000000 + # for how long worker is allowed to be bootstrapped. + allocateTimeout: 60 - # amount of time given to worker to gracefully destruct itself. In nanoseconds :( - destroyTimeout: 600000000 + # amount of time given to worker to gracefully destruct itself. + destroyTimeout: 60 # static file serving. static: diff --git a/cmd/rr/cmd/version.go b/cmd/rr/cmd/version.go index 8cbf7f69..26744922 100644 --- a/cmd/rr/cmd/version.go +++ b/cmd/rr/cmd/version.go @@ -4,7 +4,8 @@ import "time" var ( // Version - defines build version. - Version = "development" + Version = "local" + // BuildTime - defined build time. BuildTime = time.Now().Format(time.RFC1123) ) diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go index 0dca43de..ed9a1a56 100644 --- a/cmd/rr/debug/debugger.go +++ b/cmd/rr/debug/debugger.go @@ -1,10 +1,12 @@ package debug import ( + "fmt" "github.com/sirupsen/logrus" "github.com/spiral/roadrunner" "github.com/spiral/roadrunner/cmd/rr/utils" - "github.com/spiral/roadrunner/service/http" + rrhttp "github.com/spiral/roadrunner/service/http" + "net/http" "strings" ) @@ -20,21 +22,32 @@ type debugger struct{ logger *logrus.Logger } func (s *debugger) listener(event int, ctx interface{}) { // http events switch event { - case http.EventResponse: - log := ctx.(*http.Event) - s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.URI)) - case http.EventError: - log := ctx.(*http.Event) - - if _, ok := log.Error.(roadrunner.JobError); ok { - s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.URI)) + case rrhttp.EventResponse: + e := ctx.(*rrhttp.ResponseEvent) + s.logger.Info(utils.Sprintf( + "<cyan+h>%s</reset> %s <white+hb>%s</reset> %s", + e.Request.RemoteAddr, + statusColor(e.Response.Status), + e.Request.Method, + e.Request.URI, + )) + case rrhttp.EventError: + e := ctx.(*rrhttp.ErrorEvent) + + if _, ok := e.Error.(roadrunner.JobError); ok { + s.logger.Info(utils.Sprintf( + "%s <white+hb>%s</reset> %s", + statusColor(500), + e.Request.Method, + uri(e.Request), + )) } else { s.logger.Info(utils.Sprintf( "%s <white+hb>%s</reset> %s <red>%s</reset>", - statusColor(log.Status), - log.Method, - log.URI, - log.Error, + statusColor(500), + e.Request.Method, + uri(e.Request), + e.Error, )) } } @@ -58,7 +71,10 @@ func (s *debugger) listener(event int, ctx interface{}) { // outputs switch event { case roadrunner.EventStderrOutput: - s.logger.Warning(strings.Trim(string(ctx.([]byte)), "\r\n")) + s.logger.Warning(utils.Sprintf( + "<yellow>%s</reset>", + strings.Trim(string(ctx.([]byte)), "\r\n"), + )) } // rr server events @@ -93,3 +109,12 @@ func statusColor(status int) string { return utils.Sprintf("<red>%v</reset>", status) } + +// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled). +func uri(r *http.Request) string { + if r.TLS != nil { + return fmt.Sprintf("https://%s%s", r.Host, r.URL.String()) + } + + return fmt.Sprintf("http://%s%s", r.Host, r.URL.String()) +} diff --git a/cmd/rr/http/reset.go b/cmd/rr/http/reset.go index 431b7e88..3bc089ec 100644 --- a/cmd/rr/http/reset.go +++ b/cmd/rr/http/reset.go @@ -39,7 +39,7 @@ func init() { func reloadHandler(cmd *cobra.Command, args []string) error { svc, st := rr.Container.Get(rpc.ID) - if st < service.StatusConfigured { + if st < service.StatusOK { return errors.New("RPC service is not configured") } diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go index e697816f..b03c273f 100644 --- a/cmd/rr/http/workers.go +++ b/cmd/rr/http/workers.go @@ -74,7 +74,7 @@ func workersHandler(cmd *cobra.Command, args []string) (err error) { }() svc, st := rr.Container.Get(rrpc.ID) - if st < service.StatusConfigured { + if st < service.StatusOK { return errors.New("RPC service is not configured") } diff --git a/error_buffer.go b/error_buffer.go index 8be9c5a8..211fe25f 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -43,7 +43,9 @@ func newErrBuffer() *errBuffer { if len(eb.buf) > eb.last { if eb.lsn != nil { eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + eb.buf = eb.buf[0:0] } + eb.last = len(eb.buf) } eb.mu.Unlock() @@ -55,6 +57,7 @@ func newErrBuffer() *errBuffer { if eb.lsn != nil { eb.lsn(EventStderrOutput, eb.buf[eb.last:]) } + eb.last = len(eb.buf) } eb.mu.Unlock() diff --git a/error_buffer_test.go b/error_buffer_test.go index 09ea4f03..81107935 100644 --- a/error_buffer_test.go +++ b/error_buffer_test.go @@ -30,8 +30,7 @@ func TestErrBuffer_Write_Event(t *testing.T) { <-tr // messages are read - assert.Equal(t, 6, buf.Len()) - assert.Equal(t, "hello\n", buf.String()) + assert.Equal(t, 0, buf.Len()) } func TestErrBuffer_Write_Event_Separated(t *testing.T) { @@ -50,6 +49,18 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) { buf.Write([]byte("ending")) <-tr + assert.Equal(t, 0, buf.Len()) + assert.Equal(t, "", buf.String()) +} + +func TestErrBuffer_Write_Event_Separated_NoListener(t *testing.T) { + buf := newErrBuffer() + defer buf.Close() + + buf.Write([]byte("hel")) + buf.Write([]byte("lo\n")) + buf.Write([]byte("ending")) + assert.Equal(t, 12, buf.Len()) assert.Equal(t, "hello\nending", buf.String()) } @@ -1,10 +1,16 @@ module github.com/spiral/roadrunner require ( + github.com/BurntSushi/toml v0.3.0 + github.com/StackExchange/wmi v0.0.0-20180412205111-cdffdb33acae github.com/buger/goterm v0.0.0-20180423150900-6d19e6a8df12 + github.com/davecgh/go-spew v1.1.0 github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e github.com/fsnotify/fsnotify v1.4.7 + github.com/go-ole/go-ole v1.2.1 + github.com/golang/protobuf v1.1.0 github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce + github.com/inconshreveable/mousetrap v1.0.0 github.com/magiconair/properties v1.8.0 github.com/mattn/go-colorable v0.0.9 github.com/mattn/go-isatty v0.0.3 @@ -12,9 +18,13 @@ require ( github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b github.com/mitchellh/mapstructure v0.0.0-20180511142126-bb74f1db0675 github.com/olekukonko/tablewriter v0.0.0-20180506121414-d4647c9c7a84 + github.com/onsi/ginkgo v1.5.0 + github.com/onsi/gomega v1.4.0 github.com/pelletier/go-toml v1.2.0 github.com/pkg/errors v0.8.0 + github.com/pmezard/go-difflib v1.0.0 github.com/shirou/gopsutil v0.0.0-20180613084040-c23bcca55e77 + github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 github.com/sirupsen/logrus v1.0.5 github.com/spf13/afero v1.1.1 github.com/spf13/cast v1.2.0 @@ -23,8 +33,13 @@ require ( github.com/spf13/pflag v1.0.1 github.com/spf13/viper v1.0.2 github.com/spiral/goridge v0.0.0-20180607130832-0351012be508 + github.com/stretchr/testify v1.2.2 golang.org/x/crypto v0.0.0-20180614221331-a8fb68e7206f + golang.org/x/net v0.0.0-20180709032641-4d581e05a3ac + golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f golang.org/x/sys v0.0.0-20180615093615-8014b7b116a6 golang.org/x/text v0.3.0 + gopkg.in/airbrake/gobrake.v2 v2.0.9 + gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 gopkg.in/yaml.v2 v2.2.1 ) diff --git a/php-src/PSR7Client.php b/php-src/PSR7Client.php index 858e8405..e8d93fe8 100644 --- a/php-src/PSR7Client.php +++ b/php-src/PSR7Client.php @@ -64,6 +64,8 @@ class PSR7Client $bodyStream->write($body); } + $_SERVER = $this->configureServer($ctx); + $request = new Diactoros\ServerRequest( $_SERVER, $this->wrapUploads($ctx['uploads']), @@ -105,6 +107,23 @@ class PSR7Client ])); } + /** + * Returns altered copy of _SERVER variable. Sets ip-address, + * request-time and other values. + * + * @param array $ctx + * @return array + */ + protected function configureServer(array $ctx): array + { + $server = $_SERVER; + $server['REQUEST_TIME'] = time(); + $server['REQUEST_TIME_FLOAT'] = microtime(true); + $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1'; + + return $server; + } + /** * Wraps all uploaded files with UploadedFile. * @@ -119,18 +138,18 @@ class PSR7Client } $result = []; - foreach ($files as $index => $file) { - if (!isset($file['name'])) { - $result[$index] = $this->wrapUploads($file); + foreach ($files as $index => $f) { + if (!isset($f['name'])) { + $result[$index] = $this->wrapUploads($f); continue; } $result[$index] = new Diactoros\UploadedFile( - $file['tmpName'], - $file['size'], - $file['error'], - $file['name'], - $file['mime'] + $f['tmpName'], + $f['size'], + $f['error'], + $f['name'], + $f['mime'] ); } diff --git a/service/container.go b/service/container.go index 0987b1ae..436d2e5f 100644 --- a/service/container.go +++ b/service/container.go @@ -4,23 +4,32 @@ import ( "fmt" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "reflect" "sync" ) -// Config provides ability to slice configuration sections and unmarshal configuration data into -// given structure. -type Config interface { - // Get nested config section (sub-map), returns nil if section not found. - Get(service string) Config +var noConfig = fmt.Errorf("no config has been provided") - // Unmarshal unmarshal config data into given struct. - Unmarshal(out interface{}) error +// InitMethod contains name of the method to be automatically invoked while service initialization. Must return +// (bool, error). Container can be requested as well. Config can be requested in a form +// of service.Config or pointer to service specific config struct (automatically unmarshalled), config argument must +// implement service.HydrateConfig. +const InitMethod = "Init" + +// Service can serve. Service can provide Init method which must return (bool, error) signature and might accept +// other services and/or configs as dependency. +type Service interface { + // Serve serves. + Serve() error + + // Stop stops the service. + Stop() } // Container controls all internal RR services and provides plugin based system. type Container interface { // Register add new service to the container under given name. - Register(name string, service Service) + Register(name string, service interface{}) // Reconfigure configures all underlying services with given configuration. Init(cfg Config) error @@ -28,9 +37,9 @@ type Container interface { // Check if svc has been registered. Has(service string) bool - // Get returns svc instance by it's name or nil if svc not found. Method returns current service status + // get returns svc instance by it's name or nil if svc not found. Method returns current service status // as second value. - Get(service string) (svc Service, status int) + Get(service string) (svc interface{}, status int) // Serve all configured services. Non blocking. Serve() error @@ -39,6 +48,24 @@ type Container interface { Stop() } +// Config provides ability to slice configuration sections and unmarshal configuration data into +// given structure. +type Config interface { + // get nested config section (sub-map), returns nil if section not found. + Get(service string) Config + + // Unmarshal unmarshal config data into given struct. + Unmarshal(out interface{}) error +} + +// HydrateConfig provides ability to automatically hydrate config with values using +// service.Config as the source. +type HydrateConfig interface { + // Hydrate must populate config values using given config source. + // Must return error if config is not valid. + Hydrate(cfg Config) error +} + type container struct { log logrus.FieldLogger mu sync.Mutex @@ -54,7 +81,7 @@ func NewContainer(log logrus.FieldLogger) Container { } // Register add new service to the container under given name. -func (c *container) Register(name string, service Service) { +func (c *container) Register(name string, service interface{}) { c.mu.Lock() defer c.mu.Unlock() @@ -81,8 +108,8 @@ func (c *container) Has(target string) bool { return false } -// Get returns svc instance by it's name or nil if svc not found. -func (c *container) Get(target string) (svc Service, status int) { +// get returns svc instance by it's name or nil if svc not found. +func (c *container) Get(target string) (svc interface{}, status int) { c.mu.Lock() defer c.mu.Unlock() @@ -98,21 +125,24 @@ func (c *container) Get(target string) (svc Service, status int) { // Init configures all underlying services with given configuration. func (c *container) Init(cfg Config) error { for _, e := range c.services { - if e.getStatus() >= StatusConfigured { + if e.getStatus() >= StatusOK { return fmt.Errorf("service [%s] has already been configured", e.name) } - segment := cfg.Get(e.name) - if segment == nil { - c.log.Debugf("[%s]: no config has been provided", e.name) - continue - } + // inject service dependencies + if ok, err := c.initService(e.svc, cfg.Get(e.name)); err != nil { + // soft error (skipping) + if err == noConfig { + c.log.Warningf("[%s]: no config has been provided", e.name) + continue + } - ok, err := e.svc.Init(segment, c) - if err != nil { return errors.Wrap(err, fmt.Sprintf("[%s]", e.name)) } else if ok { - e.setStatus(StatusConfigured) + e.setStatus(StatusOK) + c.log.Debugf("[%s]: initiated", e.name) + } else { + c.log.Debugf("[%s]: disabled", e.name) } } @@ -127,18 +157,18 @@ func (c *container) Serve() error { ) for _, e := range c.services { - if e.hasStatus(StatusConfigured) { + if e.hasStatus(StatusOK) && e.canServe() { numServing++ } else { continue } - c.log.Debugf("[%s]: started", e.name) + c.log.Debugf("[%s]: service started", e.name) go func(e *entry) { e.setStatus(StatusServing) defer e.setStatus(StatusStopped) - if err := e.svc.Serve(); err != nil { + if err := e.svc.(Service).Serve(); err != nil { c.log.Errorf("[%s]: %s", e.name, err) done <- errors.Wrap(err, fmt.Sprintf("[%s]", e.name)) } else { @@ -167,11 +197,114 @@ func (c *container) Serve() error { // Stop sends stop command to all running services. func (c *container) Stop() { + c.log.Debugf("received stop command") + for _, e := range c.services { if e.hasStatus(StatusServing) { - e.svc.Stop() + e.svc.(Service).Stop() e.setStatus(StatusStopped) + c.log.Debugf("[%s]: stopped", e.name) } } } + +// calls Init method with automatically resolved arguments. +func (c *container) initService(s interface{}, segment Config) (bool, error) { + r := reflect.TypeOf(s) + + m, ok := r.MethodByName("Init") + if !ok { + // no Init method is presented, assuming service does not need initialization. + return true, nil + } + + if err := c.verifySignature(m); err != nil { + return false, err + } + + // hydrating + values, err := c.resolveValues(s, m, segment) + if err != nil { + return false, err + } + + // initiating service + out := m.Func.Call(values) + + if out[1].IsNil() { + return out[0].Bool(), nil + } + + return out[0].Bool(), out[1].Interface().(error) +} + +// resolveValues returns slice of call arguments for service Init method. +func (c *container) resolveValues(s interface{}, m reflect.Method, cfg Config) (values []reflect.Value, err error) { + for i := 0; i < m.Type.NumIn(); i++ { + v := m.Type.In(i) + + switch { + case v.ConvertibleTo(reflect.ValueOf(s).Type()): // service itself + values = append(values, reflect.ValueOf(s)) + + case v.Implements(reflect.TypeOf((*Container)(nil)).Elem()): // container + values = append(values, reflect.ValueOf(c)) + + case v.Implements(reflect.TypeOf((*HydrateConfig)(nil)).Elem()): // injectable config + if cfg == nil { + return nil, noConfig + } + + sc := reflect.New(v.Elem()) + if err := sc.Interface().(HydrateConfig).Hydrate(cfg); err != nil { + return nil, err + } + + values = append(values, sc) + + case v.Implements(reflect.TypeOf((*Config)(nil)).Elem()): // generic config section + if cfg == nil { + return nil, noConfig + } + + values = append(values, reflect.ValueOf(cfg)) + + default: // dependency on other service (resolution to nil if service can't be found) + found := false + for _, e := range c.services { + if !e.hasStatus(StatusOK) || !v.ConvertibleTo(reflect.ValueOf(e.svc).Type()) { + continue + } + + found = true + values = append(values, reflect.ValueOf(e.svc)) + break + } + + if !found { + // placeholder (make sure to check inside the method) + values = append(values, reflect.New(v).Elem()) + } + } + } + + return +} + +// verifySignature checks if Init method has valid signature +func (c *container) verifySignature(m reflect.Method) error { + if m.Type.NumOut() != 2 { + return fmt.Errorf("method Init must have exact 2 return values") + } + + if m.Type.Out(0).Kind() != reflect.Bool { + return fmt.Errorf("first return value of Init method must be bool type") + } + + if !m.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { + return fmt.Errorf("second return value of Init method value must be error type") + } + + return nil +} diff --git a/service/container_test.go b/service/container_test.go index 3092be78..8cb97c74 100644 --- a/service/container_test.go +++ b/service/container_test.go @@ -66,7 +66,7 @@ func (t *testService) setChan(c chan interface{}) { type testCfg struct{ cfg string } func (cfg *testCfg) Get(name string) Config { - vars := make(map[string]string) + vars := make(map[string]interface{}) json.Unmarshal([]byte(cfg.cfg), &vars) v, ok := vars[name] @@ -74,7 +74,8 @@ func (cfg *testCfg) Get(name string) Config { return nil } - return &testCfg{cfg: v} + d, _ := json.Marshal(v) + return &testCfg{cfg: string(d)} } func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } @@ -144,7 +145,7 @@ func TestContainer_Configure(t *testing.T) { s, st := c.Get("test") assert.IsType(t, &testService{}, s) - assert.Equal(t, StatusConfigured, st) + assert.Equal(t, StatusOK, st) } func TestContainer_ConfigureNull(t *testing.T) { @@ -176,7 +177,7 @@ func TestContainer_ConfigureDisabled(t *testing.T) { assert.Equal(t, 1, len(hook.Entries)) assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`})) - assert.Equal(t, 1, len(hook.Entries)) + assert.Equal(t, 2, len(hook.Entries)) s, st := c.Get("test") assert.IsType(t, &testService{}, s) @@ -325,3 +326,106 @@ func TestContainer_ServeErrorMultiple(t *testing.T) { assert.IsType(t, &testService{}, s) assert.Equal(t, StatusStopped, st) } + +type testInitA struct{} + +func (t *testInitA) Init() error { + return nil +} + +type testInitB struct{} + +func (t *testInitB) Init() (int, error) { + return 0, nil +} + +func TestContainer_InitErrorA(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testInitA{}) + + assert.Error(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`})) +} + +func TestContainer_InitErrorB(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testInitB{}) + + assert.Error(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`})) +} + +type testInitC struct{} + +func TestContainer_NoInit(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testInitC{}) + + assert.NoError(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`})) +} + +type testInitD struct { + c *testInitC +} + +type DCfg struct { + V string +} + +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *DCfg) Hydrate(cfg Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if c.V == "fail" { + return errors.New("failed config") + } + + return nil +} + +func (t *testInitD) Init(r *testInitC, c Container, cfg *DCfg) (bool, error) { + if r == nil { + return false, errors.New("unable to find testInitC") + } + + if c == nil { + return false, errors.New("unable to find Container") + } + + if cfg.V != "ok" { + return false, errors.New("invalid config") + } + + return false, nil +} + +func TestContainer_InitDependency(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testInitC{}) + c.Register("test2", &testInitD{}) + + assert.NoError(t, c.Init(&testCfg{`{"test":"something", "test2":{"v":"ok"}}`})) +} + +func TestContainer_InitDependencyFail(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := NewContainer(logger) + c.Register("test", &testInitC{}) + c.Register("test2", &testInitD{}) + + assert.Error(t, c.Init(&testCfg{`{"test":"something", "test2":{"v":"fail"}}`})) +} diff --git a/service/service.go b/service/entry.go index 6cd12b51..f2cbac28 100644 --- a/service/service.go +++ b/service/entry.go @@ -1,19 +1,8 @@ package service -import "sync" - -// Service provides high level functionality for road runner modules. -type Service interface { - // Init must return configure service and return true if service hasStatus enabled. Must return error in case of - // misconfiguration. Services must not be used without proper configuration pushed first. - Init(cfg Config, c Container) (enabled bool, err error) - - // Serve serves. - Serve() error - - // Stop stops the service. - Stop() -} +import ( + "sync" +) const ( // StatusUndefined when service bus can not find the service. @@ -22,8 +11,8 @@ const ( // StatusRegistered hasStatus setStatus when service has been registered in container. StatusRegistered - // StatusConfigured hasStatus setStatus when service has been properly configured. - StatusConfigured + // StatusOK hasStatus setStatus when service has been properly configured. + StatusOK // StatusServing hasStatus setStatus when service hasStatus currently done. StatusServing @@ -35,7 +24,7 @@ const ( // entry creates association between service instance and given name. type entry struct { name string - svc Service + svc interface{} mu sync.Mutex status int } @@ -59,3 +48,9 @@ func (e *entry) setStatus(status int) { func (e *entry) hasStatus(status int) bool { return e.getStatus() == status } + +// canServe returns true is service can serve. +func (e *entry) canServe() bool { + _, ok := e.svc.(Service) + return ok +} diff --git a/service/entry_test.go b/service/entry_test.go new file mode 100644 index 00000000..b5c71a10 --- /dev/null +++ b/service/entry_test.go @@ -0,0 +1,16 @@ +package service + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestEntry_CanServeFalse(t *testing.T) { + e := &entry{svc: nil} + assert.False(t, e.canServe()) +} + +func TestEntry_CanServeTrue(t *testing.T) { + e := &entry{svc: &testService{}} + assert.True(t, e.canServe()) +} diff --git a/service/http/attributes.go b/service/http/attributes.go deleted file mode 100644 index acea38a1..00000000 --- a/service/http/attributes.go +++ /dev/null @@ -1,69 +0,0 @@ -package http - -import ( - "context" - "net/http" - "errors" -) - -const contextKey = "psr:attributes" - -type attrs map[string]interface{} - -// InitAttributes returns request with new context and attribute bag. -func InitAttributes(r *http.Request) *http.Request { - return r.WithContext(context.WithValue(r.Context(), contextKey, attrs{})) -} - -// AllAttributes returns all context attributes. -func AllAttributes(r *http.Request) map[string]interface{} { - v := r.Context().Value(contextKey) - if v == nil { - return attrs{} - } - - return v.(attrs) -} - -// Get gets the value from request context. It replaces any existing -// values. -func GetAttribute(r *http.Request, key string) interface{} { - v := r.Context().Value(contextKey) - if v == nil { - return nil - } - - return v.(attrs).Get(key) -} - -// Set sets the key to value. It replaces any existing -// values. Context specific. -func SetAttribute(r *http.Request, key string, value interface{}) error { - v := r.Context().Value(contextKey) - if v == nil { - return errors.New("unable to find psr:attributes context value") - } - - v.(attrs).Set(key, value) - return nil -} - -// Get gets the value associated with the given key. -func (v attrs) Get(key string) interface{} { - if v == nil { - return "" - } - - return v[key] -} - -// Set sets the key to value. It replaces any existing -// values. -func (v attrs) Set(key string, value interface{}) { - v[key] = value -} - -// Del deletes the value associated with key. -func (v attrs) Del(key string) { - delete(v, key) -} diff --git a/service/http/attributes/attributes.go b/service/http/attributes/attributes.go new file mode 100644 index 00000000..94d0e9c1 --- /dev/null +++ b/service/http/attributes/attributes.go @@ -0,0 +1,74 @@ +package attributes + +import ( + "context" + "errors" + "net/http" +) + +const contextKey = "psr:attributes" + +type attrs map[string]interface{} + +func (v attrs) get(key string) interface{} { + if v == nil { + return "" + } + + return v[key] +} + +func (v attrs) set(key string, value interface{}) { + v[key] = value +} + +func (v attrs) del(key string) { + delete(v, key) +} + +// Init returns request with new context and attribute bag. +func Init(r *http.Request) *http.Request { + return r.WithContext(context.WithValue(r.Context(), contextKey, attrs{})) +} + +// All returns all context attributes. +func All(r *http.Request) map[string]interface{} { + v := r.Context().Value(contextKey) + if v == nil { + return attrs{} + } + + return v.(attrs) +} + +// get gets the value from request context. It replaces any existing +// values. +func Get(r *http.Request, key string) interface{} { + v := r.Context().Value(contextKey) + if v == nil { + return nil + } + + return v.(attrs).get(key) +} + +// set sets the key to value. It replaces any existing +// values. Context specific. +func Set(r *http.Request, key string, value interface{}) error { + v := r.Context().Value(contextKey) + if v == nil { + return errors.New("unable to find `psr:attributes` context key") + } + + v.(attrs).set(key, value) + return nil +} + +// Delete deletes values associated with attribute key. +func (v attrs) Delete(key string) { + if v == nil { + return + } + + v.del(key) +} diff --git a/service/http/attributes/attributes_test.go b/service/http/attributes/attributes_test.go new file mode 100644 index 00000000..a71d6542 --- /dev/null +++ b/service/http/attributes/attributes_test.go @@ -0,0 +1,67 @@ +package attributes + +import ( + "github.com/stretchr/testify/assert" + "net/http" + "testing" +) + +func TestAllAttributes(t *testing.T) { + r := &http.Request{} + r = Init(r) + + Set(r, "key", "value") + + assert.Equal(t, All(r), map[string]interface{}{ + "key": "value", + }) +} + +func TestAllAttributesNone(t *testing.T) { + r := &http.Request{} + r = Init(r) + + assert.Equal(t, All(r), map[string]interface{}{}) +} + +func TestAllAttributesNone2(t *testing.T) { + r := &http.Request{} + + assert.Equal(t, All(r), map[string]interface{}{}) +} + +func TestGetAttribute(t *testing.T) { + r := &http.Request{} + r = Init(r) + + Set(r, "key", "value") + assert.Equal(t, Get(r, "key"), "value") +} + +func TestGetAttributeNone(t *testing.T) { + r := &http.Request{} + r = Init(r) + + assert.Equal(t, Get(r, "key"), nil) +} + +func TestGetAttributeNone2(t *testing.T) { + r := &http.Request{} + + assert.Equal(t, Get(r, "key"), nil) +} + +func TestSetAttribute(t *testing.T) { + r := &http.Request{} + r = Init(r) + + Set(r, "key", "value") + assert.Equal(t, Get(r, "key"), "value") +} + +func TestSetAttributeNone(t *testing.T) { + r := &http.Request{} + + Set(r, "key", "value") + assert.Equal(t, Get(r, "key"), nil) +} diff --git a/service/http/attributes_test.go b/service/http/attributes_test.go deleted file mode 100644 index aeb7fe74..00000000 --- a/service/http/attributes_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package http - -import ( - "testing" - "net/http" - "github.com/stretchr/testify/assert" -) - -func TestAllAttributes(t *testing.T) { - r := &http.Request{} - r = InitAttributes(r) - - SetAttribute(r, "key", "value") - - assert.Equal(t, AllAttributes(r), map[string]interface{}{ - "key": "value", - }) -} - -func TestAllAttributesNone(t *testing.T) { - r := &http.Request{} - r = InitAttributes(r) - - assert.Equal(t, AllAttributes(r), map[string]interface{}{}) -} - -func TestAllAttributesNone2(t *testing.T) { - r := &http.Request{} - - assert.Equal(t, AllAttributes(r), map[string]interface{}{}) -} - -func TestGetAttribute(t *testing.T) { - r := &http.Request{} - r = InitAttributes(r) - - SetAttribute(r, "key", "value") - assert.Equal(t, GetAttribute(r, "key"), "value") -} - -func TestGetAttributeNone(t *testing.T) { - r := &http.Request{} - r = InitAttributes(r) - - assert.Equal(t, GetAttribute(r, "key"), nil) -} - -func TestGetAttributeNone2(t *testing.T) { - r := &http.Request{} - - assert.Equal(t, GetAttribute(r, "key"), nil) -} - -func TestSetAttribute(t *testing.T) { - r := &http.Request{} - r = InitAttributes(r) - - SetAttribute(r, "key", "value") - assert.Equal(t, GetAttribute(r, "key"), "value") -} - -func TestSetAttributeNone(t *testing.T) { - r := &http.Request{} - - SetAttribute(r, "key", "value") - assert.Equal(t, GetAttribute(r, "key"), nil) -}
\ No newline at end of file diff --git a/service/http/config.go b/service/http/config.go index 19a2e71d..20a247fb 100644 --- a/service/http/config.go +++ b/service/http/config.go @@ -3,7 +3,9 @@ package http import ( "errors" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" "strings" + "time" ) // Config configures RoadRunner HTTP server. @@ -24,25 +26,54 @@ type Config struct { Workers *roadrunner.ServerConfig } +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + if err := c.Valid(); err != nil { + return err + } + + if c.Workers.Relay == "" { + c.Workers.Relay = "pipes" + } + + if c.Workers.RelayTimeout < time.Microsecond { + c.Workers.RelayTimeout = time.Second * time.Duration(c.Workers.RelayTimeout.Nanoseconds()) + } + + if c.Workers.Pool.AllocateTimeout < time.Microsecond { + c.Workers.Pool.AllocateTimeout = time.Second * time.Duration(c.Workers.Pool.AllocateTimeout.Nanoseconds()) + } + + if c.Workers.Pool.DestroyTimeout < time.Microsecond { + c.Workers.Pool.DestroyTimeout = time.Second * time.Duration(c.Workers.Pool.DestroyTimeout.Nanoseconds()) + } + + return nil +} + // Valid validates the configuration. -func (cfg *Config) Valid() error { - if cfg.Uploads == nil { +func (c *Config) Valid() error { + if c.Uploads == nil { return errors.New("mailformed uploads config") } - if cfg.Workers == nil { + if c.Workers == nil { return errors.New("mailformed workers config") } - if cfg.Workers.Pool == nil { + if c.Workers.Pool == nil { return errors.New("mailformed workers config (pool config is missing)") } - if err := cfg.Workers.Pool.Valid(); err != nil { + if err := c.Workers.Pool.Valid(); err != nil { return err } - if !strings.Contains(cfg.Address, ":") { + if !strings.Contains(c.Address, ":") { return errors.New("mailformed server address") } diff --git a/service/http/config_test.go b/service/http/config_test.go index cb804f4a..2e3fe731 100644 --- a/service/http/config_test.go +++ b/service/http/config_test.go @@ -1,13 +1,34 @@ package http import ( + "encoding/json" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service" "github.com/stretchr/testify/assert" "os" "testing" "time" ) +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error1(t *testing.T) { + cfg := &mockCfg{`{"enable": true}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"dir": "/dir/"`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + func Test_Config_Valid(t *testing.T) { cfg := &Config{ Enable: true, diff --git a/service/http/handler.go b/service/http/handler.go index 6f2617b1..9e67e5b4 100644 --- a/service/http/handler.go +++ b/service/http/handler.go @@ -9,26 +9,29 @@ import ( ) const ( - // EventResponse thrown after the request been processed. See Event as payload. + // EventResponse thrown after the request been processed. See ErrorEvent as payload. EventResponse = iota + 500 // EventError thrown on any non job error provided by road runner server. EventError ) -// Event represents singular http response event. -type Event struct { - // Method of the request. - Method string +// ErrorEvent represents singular http error event. +type ErrorEvent struct { + // Request contains client request, must not be stored. + Request *http.Request - // URI requested by the client. - URI string + // Error - associated error, if any. + Error error +} - // Status is response status. - Status int +// ResponseEvent represents singular http response event. +type ResponseEvent struct { + // Request contains client request, must not be stored. + Request *Request - // Associated error, if any. - Error error + // Response contains service response. + Response *Response } // Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers, @@ -99,7 +102,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // handleError sends error. func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error) { - h.throw(EventError, &Event{Method: r.Method, URI: uri(r), Status: 500, Error: err}) + h.throw(EventError, &ErrorEvent{Request: r, Error: err}) w.WriteHeader(500) w.Write([]byte(err.Error())) @@ -107,7 +110,7 @@ func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error) // handleResponse triggers response event. func (h *Handler) handleResponse(req *Request, resp *Response) { - h.throw(EventResponse, &Event{Method: req.Method, URI: req.URI, Status: resp.Status}) + h.throw(EventResponse, &ResponseEvent{Request: req, Response: resp}) } // throw invokes event srv if any. diff --git a/service/http/request.go b/service/http/request.go index 21566416..6d5cc126 100644 --- a/service/http/request.go +++ b/service/http/request.go @@ -4,7 +4,9 @@ import ( "encoding/json" "fmt" "github.com/spiral/roadrunner" + "github.com/spiral/roadrunner/service/http/attributes" "io/ioutil" + "net" "net/http" "net/url" "strings" @@ -20,6 +22,9 @@ const ( // Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files. type Request struct { + // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address. + RemoteAddr string `json:"remoteAddr"` + // Protocol includes HTTP protocol version. Protocol string `json:"protocol"` @@ -60,7 +65,14 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) { Headers: r.Header, Cookies: make(map[string]string), RawQuery: r.URL.RawQuery, - Attributes: AllAttributes(r), + Attributes: attributes.All(r), + } + + // otherwise, return remote address as is + if strings.ContainsRune(r.RemoteAddr, ':') { + req.RemoteAddr, _, _ = net.SplitHostPort(r.RemoteAddr) + } else { + req.RemoteAddr = r.RemoteAddr } for _, c := range r.Cookies() { diff --git a/service/http/service.go b/service/http/service.go index 710cd60c..f7fdf2ab 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -3,7 +3,7 @@ package http import ( "context" "github.com/spiral/roadrunner" - "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/http/attributes" "github.com/spiral/roadrunner/service/rpc" "net/http" "sync" @@ -41,28 +41,14 @@ func (s *Service) AddListener(l func(event int, ctx interface{})) { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg service.Config, c service.Container) (bool, error) { - config := &Config{} - - if err := cfg.Unmarshal(config); err != nil { - return false, err - } - - if !config.Enable { +func (s *Service) Init(cfg *Config, r *rpc.Service) (bool, error) { + if !cfg.Enable { return false, nil } - if err := config.Valid(); err != nil { - return false, err - } - - s.cfg = config - - // registering http RPC interface - if r, ok := c.Get(rpc.ID); ok >= service.StatusConfigured { - if h, ok := r.(*rpc.Service); ok { - h.Register(ID, &rpcServer{s}) - } + s.cfg = cfg + if r != nil { + r.Register(ID, &rpcServer{s}) } return true, nil @@ -113,16 +99,17 @@ func (s *Service) Stop() { // middleware handles connection using set of mdws and rr PSR-7 server. func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { - r = InitAttributes(r) + r = attributes.Init(r) + // chaining middlewares f := s.srv.ServeHTTP for _, m := range s.mdws { f = m(f) } - f(w, r) } +// listener handles service, server and pool events. func (s *Service) listener(event int, ctx interface{}) { for _, l := range s.lsns { l(event, ctx) diff --git a/service/http/service_test.go b/service/http/service_test.go index 50836b4b..b442ae51 100644 --- a/service/http/service_test.go +++ b/service/http/service_test.go @@ -42,7 +42,7 @@ func Test_Service_NoConfig(t *testing.T) { c := service.NewContainer(logger) c.Register(ID, &Service{}) - assert.NoError(t, c.Init(&testCfg{httpCfg: `{}`})) + assert.Error(t, c.Init(&testCfg{httpCfg: `{}`})) s, st := c.Get(ID) assert.NotNil(t, s) @@ -108,7 +108,7 @@ func Test_Service_Configure_Enable(t *testing.T) { s, st := c.Get(ID) assert.NotNil(t, s) - assert.Equal(t, service.StatusConfigured, st) + assert.Equal(t, service.StatusOK, st) } func Test_Service_Echo(t *testing.T) { @@ -139,10 +139,10 @@ func Test_Service_Echo(t *testing.T) { s, st := c.Get(ID) assert.NotNil(t, s) - assert.Equal(t, service.StatusConfigured, st) + assert.Equal(t, service.StatusOK, st) // should do nothing - s.Stop() + s.(*Service).Stop() go func() { c.Serve() }() time.Sleep(time.Millisecond * 100) @@ -191,7 +191,7 @@ func Test_Service_ErrorEcho(t *testing.T) { s, st := c.Get(ID) assert.NotNil(t, s) - assert.Equal(t, service.StatusConfigured, st) + assert.Equal(t, service.StatusOK, st) goterr := make(chan interface{}) s.(*Service).AddListener(func(event int, ctx interface{}) { @@ -251,7 +251,7 @@ func Test_Service_Middleware(t *testing.T) { s, st := c.Get(ID) assert.NotNil(t, s) - assert.Equal(t, service.StatusConfigured, st) + assert.Equal(t, service.StatusOK, st) s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { @@ -325,7 +325,7 @@ func Test_Service_Listener(t *testing.T) { s, st := c.Get(ID) assert.NotNil(t, s) - assert.Equal(t, service.StatusConfigured, st) + assert.Equal(t, service.StatusOK, st) stop := make(chan interface{}) s.(*Service).AddListener(func(event int, ctx interface{}) { diff --git a/service/rpc/config.go b/service/rpc/config.go index e3168945..c37b0853 100644 --- a/service/rpc/config.go +++ b/service/rpc/config.go @@ -2,12 +2,14 @@ package rpc import ( "errors" + "github.com/spiral/roadrunner/service" "net" "strings" "syscall" ) -type config struct { +// Config defines RPC service config. +type Config struct { // Indicates if RPC connection is enabled. Enable bool @@ -15,9 +17,31 @@ type config struct { Listen string } -// listener creates new rpc socket listener. -func (cfg *config) listener() (net.Listener, error) { - dsn := strings.Split(cfg.Listen, "://") +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + + return c.Valid() +} + +// Valid returns nil if config is valid. +func (c *Config) Valid() error { + if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { + return errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 { + return errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") + } + + return nil +} + +// Listener creates new rpc socket Listener. +func (c *Config) Listener() (net.Listener, error) { + dsn := strings.Split(c.Listen, "://") if len(dsn) != 2 { return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") } @@ -29,9 +53,9 @@ func (cfg *config) listener() (net.Listener, error) { return net.Listen(dsn[0], dsn[1]) } -// dialer creates rpc socket dialer. -func (cfg *config) dialer() (net.Conn, error) { - dsn := strings.Split(cfg.Listen, "://") +// Dialer creates rpc socket Dialer. +func (c *Config) Dialer() (net.Conn, error) { + dsn := strings.Split(c.Listen, "://") if len(dsn) != 2 { return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)") } diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go index a953e30e..a7c51c0f 100644 --- a/service/rpc/config_test.go +++ b/service/rpc/config_test.go @@ -1,15 +1,36 @@ package rpc import ( + "encoding/json" + "github.com/spiral/roadrunner/service" "github.com/stretchr/testify/assert" "runtime" "testing" ) +type testCfg struct{ cfg string } + +func (cfg *testCfg) Get(name string) service.Config { return nil } +func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate(t *testing.T) { + cfg := &testCfg{`{"enable": true, "listen": "tcp://:18001"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &testCfg{`{"enable": true, "listen": "invalid"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + func TestConfig_Listener(t *testing.T) { - cfg := &config{Listen: "tcp://:18001"} + cfg := &Config{Listen: "tcp://:18001"} - ln, err := cfg.listener() + ln, err := cfg.Listener() assert.NoError(t, err) assert.NotNil(t, ln) defer ln.Close() @@ -23,9 +44,9 @@ func TestConfig_ListenerUnix(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "unix://rpc.sock"} + cfg := &Config{Listen: "unix://rpc.sock"} - ln, err := cfg.listener() + ln, err := cfg.Listener() assert.NoError(t, err) assert.NotNil(t, ln) defer ln.Close() @@ -39,28 +60,28 @@ func Test_Config_Error(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "uni:unix.sock"} - ln, err := cfg.listener() + cfg := &Config{Listen: "uni:unix.sock"} + ln, err := cfg.Listener() assert.Nil(t, ln) assert.Error(t, err) assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) } func Test_Config_ErrorMethod(t *testing.T) { - cfg := &config{Listen: "xinu://unix.sock"} + cfg := &Config{Listen: "xinu://unix.sock"} - ln, err := cfg.listener() + ln, err := cfg.Listener() assert.Nil(t, ln) assert.Error(t, err) } func TestConfig_Dialer(t *testing.T) { - cfg := &config{Listen: "tcp://:18001"} + cfg := &Config{Listen: "tcp://:18001"} - ln, err := cfg.listener() + ln, err := cfg.Listener() defer ln.Close() - conn, err := cfg.dialer() + conn, err := cfg.Dialer() assert.NoError(t, err) assert.NotNil(t, conn) defer conn.Close() @@ -74,12 +95,12 @@ func TestConfig_DialerUnix(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "unix://rpc.sock"} + cfg := &Config{Listen: "unix://rpc.sock"} - ln, err := cfg.listener() + ln, err := cfg.Listener() defer ln.Close() - conn, err := cfg.dialer() + conn, err := cfg.Dialer() assert.NoError(t, err) assert.NotNil(t, conn) defer conn.Close() @@ -93,17 +114,17 @@ func Test_Config_DialerError(t *testing.T) { t.Skip("not supported on " + runtime.GOOS) } - cfg := &config{Listen: "uni:unix.sock"} - ln, err := cfg.dialer() + cfg := &Config{Listen: "uni:unix.sock"} + ln, err := cfg.Dialer() assert.Nil(t, ln) assert.Error(t, err) assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error()) } func Test_Config_DialerErrorMethod(t *testing.T) { - cfg := &config{Listen: "xinu://unix.sock"} + cfg := &Config{Listen: "xinu://unix.sock"} - ln, err := cfg.dialer() + ln, err := cfg.Dialer() assert.Nil(t, ln) assert.Error(t, err) } diff --git a/service/rpc/service.go b/service/rpc/service.go index 82f26407..6e231048 100644 --- a/service/rpc/service.go +++ b/service/rpc/service.go @@ -3,7 +3,6 @@ package rpc import ( "errors" "github.com/spiral/goridge" - "github.com/spiral/roadrunner/service" "net/rpc" "sync" ) @@ -13,27 +12,20 @@ const ID = "rpc" // Service is RPC service. type Service struct { - cfg *config - stop chan interface{} - rpc *rpc.Server - + cfg *Config + stop chan interface{} + rpc *rpc.Server mu sync.Mutex serving bool } -// Init must return configure service and return true if service hasStatus enabled. Must return error in case of -// misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg service.Config, reg service.Container) (enabled bool, err error) { - config := &config{} - if err := cfg.Unmarshal(config); err != nil { - return false, err - } - - if !config.Enable { +// Init rpc service. Must return true if service is enabled. +func (s *Service) Init(cfg *Config) (bool, error) { + if !cfg.Enable { return false, nil } - s.cfg = config + s.cfg = cfg s.rpc = rpc.NewServer() return true, nil @@ -50,7 +42,7 @@ func (s *Service) Serve() error { s.stop = make(chan interface{}) s.mu.Unlock() - ln, err := s.cfg.listener() + ln, err := s.cfg.Listener() if err != nil { return err } @@ -99,12 +91,12 @@ func (s *Service) Stop() { // - one return value, of type error // It returns an error if the receiver is not an exported type or has // no suitable methods. It also logs the error using package log. -func (s *Service) Register(name string, rcvr interface{}) error { +func (s *Service) Register(name string, svc interface{}) error { if s.rpc == nil { return errors.New("RPC service is not configured") } - return s.rpc.RegisterName(name, rcvr) + return s.rpc.RegisterName(name, svc) } // Client creates new RPC client. @@ -113,7 +105,7 @@ func (s *Service) Client() (*rpc.Client, error) { return nil, errors.New("RPC service is not configured") } - conn, err := s.cfg.dialer() + conn, err := s.cfg.Dialer() if err != nil { return nil, err } diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go index d4734bb5..fc88d38d 100644 --- a/service/rpc/service_test.go +++ b/service/rpc/service_test.go @@ -1,8 +1,6 @@ package rpc import ( - "encoding/json" - "github.com/spiral/roadrunner/service" "github.com/stretchr/testify/assert" "testing" "time" @@ -12,22 +10,9 @@ type testService struct{} func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil } -type testCfg struct{ cfg string } - -func (cfg *testCfg) Get(name string) service.Config { return nil } -func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } - -func Test_ConfigError(t *testing.T) { - s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":false`}, nil) - - assert.Error(t, err) - assert.False(t, ok) -} - func Test_Disabled(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":false}`}, nil) + ok, err := s.Init(&Config{Enable: false}) assert.NoError(t, err) assert.False(t, ok) @@ -45,7 +30,7 @@ func Test_RegisterNotConfigured(t *testing.T) { func Test_Enabled(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}) assert.NoError(t, err) assert.True(t, ok) @@ -53,7 +38,7 @@ func Test_Enabled(t *testing.T) { func Test_StopNonServing(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"}) assert.NoError(t, err) assert.True(t, ok) @@ -62,7 +47,7 @@ func Test_StopNonServing(t *testing.T) { func Test_Serve_Errors(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"}) assert.NoError(t, err) assert.True(t, ok) @@ -75,7 +60,7 @@ func Test_Serve_Errors(t *testing.T) { func Test_Serve_Client(t *testing.T) { s := &Service{} - ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9018"}`}, nil) + ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"}) assert.NoError(t, err) assert.True(t, ok) diff --git a/service/static/config.go b/service/static/config.go index 1020b8cd..95fdbeee 100644 --- a/service/static/config.go +++ b/service/static/config.go @@ -2,6 +2,7 @@ package static import ( "github.com/pkg/errors" + "github.com/spiral/roadrunner/service" "os" "path" "strings" @@ -20,22 +21,18 @@ type Config struct { Forbid []string } -// Forbids must return true if file extension is not allowed for the upload. -func (cfg *Config) Forbids(filename string) bool { - ext := strings.ToLower(path.Ext(filename)) - - for _, v := range cfg.Forbid { - if ext == v { - return true - } +// Hydrate must populate Config values using given Config source. Must return error if Config is not valid. +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err } - return false + return c.Valid() } -// Valid validates existence of directory. -func (cfg *Config) Valid() error { - st, err := os.Stat(cfg.Dir) +// Valid returns nil if config is valid. +func (c *Config) Valid() error { + st, err := os.Stat(c.Dir) if err != nil { if os.IsNotExist(err) { return errors.New("root directory does not exists") @@ -50,3 +47,16 @@ func (cfg *Config) Valid() error { return nil } + +// Forbids must return true if file extension is not allowed for the upload. +func (c *Config) Forbids(filename string) bool { + ext := strings.ToLower(path.Ext(filename)) + + for _, v := range c.Forbid { + if ext == v { + return true + } + } + + return false +} diff --git a/service/static/config_test.go b/service/static/config_test.go index d2099cdf..18168d59 100644 --- a/service/static/config_test.go +++ b/service/static/config_test.go @@ -1,10 +1,31 @@ package static import ( + "encoding/json" + "github.com/spiral/roadrunner/service" "github.com/stretchr/testify/assert" "testing" ) +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate(t *testing.T) { + cfg := &mockCfg{`{"dir": "./"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error(t *testing.T) { + cfg := &mockCfg{`{"dir": "/dir/"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + func TestConfig_Forbids(t *testing.T) { cfg := Config{Forbid: []string{".php"}} diff --git a/service/static/service.go b/service/static/service.go index add242e4..98d8313c 100644 --- a/service/static/service.go +++ b/service/static/service.go @@ -1,7 +1,6 @@ package static import ( - "github.com/spiral/roadrunner/service" rrttp "github.com/spiral/roadrunner/service/http" "net/http" "path" @@ -22,39 +21,18 @@ type Service struct { // Init must return configure service and return true if service hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Service) Init(cfg service.Config, c service.Container) (enabled bool, err error) { - config := &Config{} - if err := cfg.Unmarshal(config); err != nil { - return false, err - } - - if !config.Enable { +func (s *Service) Init(cfg *Config, r *rrttp.Service) (bool, error) { + if !cfg.Enable || r == nil { return false, nil } - if err := config.Valid(); err != nil { - return false, err - } - - s.cfg = config + s.cfg = cfg s.root = http.Dir(s.cfg.Dir) - - // registering as middleware - if h, ok := c.Get(rrttp.ID); ok >= service.StatusConfigured { - if h, ok := h.(*rrttp.Service); ok { - h.AddMiddleware(s.middleware) - } - } + r.AddMiddleware(s.middleware) return true, nil } -// Serve serves the service. -func (s *Service) Serve() error { return nil } - -// Stop stops the service. -func (s *Service) Stop() {} - // middleware must return true if request/response pair is handled within the middleware. func (s *Service) middleware(f http.HandlerFunc) http.HandlerFunc { // Define the http.HandlerFunc |