summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md11
-rw-r--r--README.md8
-rwxr-xr-xbuild.sh2
-rw-r--r--cmd/rr/.rr.yaml8
-rw-r--r--cmd/rr/cmd/version.go3
-rw-r--r--cmd/rr/debug/debugger.go53
-rw-r--r--cmd/rr/http/reset.go2
-rw-r--r--cmd/rr/http/workers.go2
-rw-r--r--error_buffer.go3
-rw-r--r--error_buffer_test.go15
-rw-r--r--go.mod15
-rw-r--r--php-src/PSR7Client.php35
-rw-r--r--service/container.go185
-rw-r--r--service/container_test.go112
-rw-r--r--service/entry.go (renamed from service/service.go)29
-rw-r--r--service/entry_test.go16
-rw-r--r--service/http/attributes.go69
-rw-r--r--service/http/attributes/attributes.go74
-rw-r--r--service/http/attributes/attributes_test.go67
-rw-r--r--service/http/attributes_test.go67
-rw-r--r--service/http/config.go43
-rw-r--r--service/http/config_test.go21
-rw-r--r--service/http/handler.go29
-rw-r--r--service/http/request.go14
-rw-r--r--service/http/service.go31
-rw-r--r--service/http/service_test.go14
-rw-r--r--service/rpc/config.go38
-rw-r--r--service/rpc/config_test.go57
-rw-r--r--service/rpc/service.go30
-rw-r--r--service/rpc/service_test.go25
-rw-r--r--service/static/config.go34
-rw-r--r--service/static/config_test.go21
-rw-r--r--service/static/service.go30
33 files changed, 793 insertions, 370 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 57efd5a1..e4d41d6b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,17 @@
CHANGELOG
=========
+v1.1.0 (80.07.2018)
+-------
+- bugfix: Wrong values for $_SERVER['REQUEST_TIME'] and $_SERVER['REQUEST_TIME_FLOAT']
+- rr now resolves remoteAddr (ip-address)
+- improvements in error buffer
+- support for custom configs and dependency injection for services
+- support for net/http native middlewares
+- better debugger
+- config pre-processing now allows second values for http service timeouts
+- support for non serving services
+
v1.0.5 (30.06.2018)
-------
- docker compatible logging (forcing TTY output for logrus)
diff --git a/README.md b/README.md
index e69f90d4..911e60ff 100644
--- a/README.md
+++ b/README.md
@@ -95,11 +95,11 @@ http:
# maximum jobs per worker, 0 - unlimited.
maxJobs: 0
- # for how long pool should attempt to allocate free worker (request timeout). Nanoseconds atm. (60s)
- allocateTimeout: 60000000000
+ # for how long pool should attempt to allocate free worker (request timeout).
+ allocateTimeout: 60
- # amount of time given to worker to gracefully destruct itself. Nanoseconds atm. (30s)
- destroyTimeout: 30000000000
+ # amount of time given to worker to gracefully destruct itself.
+ destroyTimeout: 30
# static file serving.
static:
diff --git a/build.sh b/build.sh
index 6765a431..3ca8ef46 100755
--- a/build.sh
+++ b/build.sh
@@ -2,7 +2,7 @@
cd $(dirname "${BASH_SOURCE[0]}")
OD="$(pwd)"
# Pushes application version into the build information.
-RR_VERSION=1.0.5
+RR_VERSION=1.1.0
# Hardcode some values to the core package
LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}"
diff --git a/cmd/rr/.rr.yaml b/cmd/rr/.rr.yaml
index 775cd6c3..5ea6b345 100644
--- a/cmd/rr/.rr.yaml
+++ b/cmd/rr/.rr.yaml
@@ -38,11 +38,11 @@ http:
# maximum jobs per worker, 0 - unlimited.
maxJobs: 0
- # for how long worker is allowed to be bootstrapped. In nanoseconds :(
- allocateTimeout: 600000000
+ # for how long worker is allowed to be bootstrapped.
+ allocateTimeout: 60
- # amount of time given to worker to gracefully destruct itself. In nanoseconds :(
- destroyTimeout: 600000000
+ # amount of time given to worker to gracefully destruct itself.
+ destroyTimeout: 60
# static file serving.
static:
diff --git a/cmd/rr/cmd/version.go b/cmd/rr/cmd/version.go
index 8cbf7f69..26744922 100644
--- a/cmd/rr/cmd/version.go
+++ b/cmd/rr/cmd/version.go
@@ -4,7 +4,8 @@ import "time"
var (
// Version - defines build version.
- Version = "development"
+ Version = "local"
+
// BuildTime - defined build time.
BuildTime = time.Now().Format(time.RFC1123)
)
diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go
index 0dca43de..ed9a1a56 100644
--- a/cmd/rr/debug/debugger.go
+++ b/cmd/rr/debug/debugger.go
@@ -1,10 +1,12 @@
package debug
import (
+ "fmt"
"github.com/sirupsen/logrus"
"github.com/spiral/roadrunner"
"github.com/spiral/roadrunner/cmd/rr/utils"
- "github.com/spiral/roadrunner/service/http"
+ rrhttp "github.com/spiral/roadrunner/service/http"
+ "net/http"
"strings"
)
@@ -20,21 +22,32 @@ type debugger struct{ logger *logrus.Logger }
func (s *debugger) listener(event int, ctx interface{}) {
// http events
switch event {
- case http.EventResponse:
- log := ctx.(*http.Event)
- s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.URI))
- case http.EventError:
- log := ctx.(*http.Event)
-
- if _, ok := log.Error.(roadrunner.JobError); ok {
- s.logger.Info(utils.Sprintf("%s <white+hb>%s</reset> %s", statusColor(log.Status), log.Method, log.URI))
+ case rrhttp.EventResponse:
+ e := ctx.(*rrhttp.ResponseEvent)
+ s.logger.Info(utils.Sprintf(
+ "<cyan+h>%s</reset> %s <white+hb>%s</reset> %s",
+ e.Request.RemoteAddr,
+ statusColor(e.Response.Status),
+ e.Request.Method,
+ e.Request.URI,
+ ))
+ case rrhttp.EventError:
+ e := ctx.(*rrhttp.ErrorEvent)
+
+ if _, ok := e.Error.(roadrunner.JobError); ok {
+ s.logger.Info(utils.Sprintf(
+ "%s <white+hb>%s</reset> %s",
+ statusColor(500),
+ e.Request.Method,
+ uri(e.Request),
+ ))
} else {
s.logger.Info(utils.Sprintf(
"%s <white+hb>%s</reset> %s <red>%s</reset>",
- statusColor(log.Status),
- log.Method,
- log.URI,
- log.Error,
+ statusColor(500),
+ e.Request.Method,
+ uri(e.Request),
+ e.Error,
))
}
}
@@ -58,7 +71,10 @@ func (s *debugger) listener(event int, ctx interface{}) {
// outputs
switch event {
case roadrunner.EventStderrOutput:
- s.logger.Warning(strings.Trim(string(ctx.([]byte)), "\r\n"))
+ s.logger.Warning(utils.Sprintf(
+ "<yellow>%s</reset>",
+ strings.Trim(string(ctx.([]byte)), "\r\n"),
+ ))
}
// rr server events
@@ -93,3 +109,12 @@ func statusColor(status int) string {
return utils.Sprintf("<red>%v</reset>", status)
}
+
+// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
+func uri(r *http.Request) string {
+ if r.TLS != nil {
+ return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
+ }
+
+ return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
+}
diff --git a/cmd/rr/http/reset.go b/cmd/rr/http/reset.go
index 431b7e88..3bc089ec 100644
--- a/cmd/rr/http/reset.go
+++ b/cmd/rr/http/reset.go
@@ -39,7 +39,7 @@ func init() {
func reloadHandler(cmd *cobra.Command, args []string) error {
svc, st := rr.Container.Get(rpc.ID)
- if st < service.StatusConfigured {
+ if st < service.StatusOK {
return errors.New("RPC service is not configured")
}
diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go
index e697816f..b03c273f 100644
--- a/cmd/rr/http/workers.go
+++ b/cmd/rr/http/workers.go
@@ -74,7 +74,7 @@ func workersHandler(cmd *cobra.Command, args []string) (err error) {
}()
svc, st := rr.Container.Get(rrpc.ID)
- if st < service.StatusConfigured {
+ if st < service.StatusOK {
return errors.New("RPC service is not configured")
}
diff --git a/error_buffer.go b/error_buffer.go
index 8be9c5a8..211fe25f 100644
--- a/error_buffer.go
+++ b/error_buffer.go
@@ -43,7 +43,9 @@ func newErrBuffer() *errBuffer {
if len(eb.buf) > eb.last {
if eb.lsn != nil {
eb.lsn(EventStderrOutput, eb.buf[eb.last:])
+ eb.buf = eb.buf[0:0]
}
+
eb.last = len(eb.buf)
}
eb.mu.Unlock()
@@ -55,6 +57,7 @@ func newErrBuffer() *errBuffer {
if eb.lsn != nil {
eb.lsn(EventStderrOutput, eb.buf[eb.last:])
}
+
eb.last = len(eb.buf)
}
eb.mu.Unlock()
diff --git a/error_buffer_test.go b/error_buffer_test.go
index 09ea4f03..81107935 100644
--- a/error_buffer_test.go
+++ b/error_buffer_test.go
@@ -30,8 +30,7 @@ func TestErrBuffer_Write_Event(t *testing.T) {
<-tr
// messages are read
- assert.Equal(t, 6, buf.Len())
- assert.Equal(t, "hello\n", buf.String())
+ assert.Equal(t, 0, buf.Len())
}
func TestErrBuffer_Write_Event_Separated(t *testing.T) {
@@ -50,6 +49,18 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) {
buf.Write([]byte("ending"))
<-tr
+ assert.Equal(t, 0, buf.Len())
+ assert.Equal(t, "", buf.String())
+}
+
+func TestErrBuffer_Write_Event_Separated_NoListener(t *testing.T) {
+ buf := newErrBuffer()
+ defer buf.Close()
+
+ buf.Write([]byte("hel"))
+ buf.Write([]byte("lo\n"))
+ buf.Write([]byte("ending"))
+
assert.Equal(t, 12, buf.Len())
assert.Equal(t, "hello\nending", buf.String())
}
diff --git a/go.mod b/go.mod
index c77556f3..28a6fac0 100644
--- a/go.mod
+++ b/go.mod
@@ -1,10 +1,16 @@
module github.com/spiral/roadrunner
require (
+ github.com/BurntSushi/toml v0.3.0
+ github.com/StackExchange/wmi v0.0.0-20180412205111-cdffdb33acae
github.com/buger/goterm v0.0.0-20180423150900-6d19e6a8df12
+ github.com/davecgh/go-spew v1.1.0
github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e
github.com/fsnotify/fsnotify v1.4.7
+ github.com/go-ole/go-ole v1.2.1
+ github.com/golang/protobuf v1.1.0
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce
+ github.com/inconshreveable/mousetrap v1.0.0
github.com/magiconair/properties v1.8.0
github.com/mattn/go-colorable v0.0.9
github.com/mattn/go-isatty v0.0.3
@@ -12,9 +18,13 @@ require (
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b
github.com/mitchellh/mapstructure v0.0.0-20180511142126-bb74f1db0675
github.com/olekukonko/tablewriter v0.0.0-20180506121414-d4647c9c7a84
+ github.com/onsi/ginkgo v1.5.0
+ github.com/onsi/gomega v1.4.0
github.com/pelletier/go-toml v1.2.0
github.com/pkg/errors v0.8.0
+ github.com/pmezard/go-difflib v1.0.0
github.com/shirou/gopsutil v0.0.0-20180613084040-c23bcca55e77
+ github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4
github.com/sirupsen/logrus v1.0.5
github.com/spf13/afero v1.1.1
github.com/spf13/cast v1.2.0
@@ -23,8 +33,13 @@ require (
github.com/spf13/pflag v1.0.1
github.com/spf13/viper v1.0.2
github.com/spiral/goridge v0.0.0-20180607130832-0351012be508
+ github.com/stretchr/testify v1.2.2
golang.org/x/crypto v0.0.0-20180614221331-a8fb68e7206f
+ golang.org/x/net v0.0.0-20180709032641-4d581e05a3ac
+ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
golang.org/x/sys v0.0.0-20180615093615-8014b7b116a6
golang.org/x/text v0.3.0
+ gopkg.in/airbrake/gobrake.v2 v2.0.9
+ gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2
gopkg.in/yaml.v2 v2.2.1
)
diff --git a/php-src/PSR7Client.php b/php-src/PSR7Client.php
index 858e8405..e8d93fe8 100644
--- a/php-src/PSR7Client.php
+++ b/php-src/PSR7Client.php
@@ -64,6 +64,8 @@ class PSR7Client
$bodyStream->write($body);
}
+ $_SERVER = $this->configureServer($ctx);
+
$request = new Diactoros\ServerRequest(
$_SERVER,
$this->wrapUploads($ctx['uploads']),
@@ -105,6 +107,23 @@ class PSR7Client
]));
}
+ /**
+ * Returns altered copy of _SERVER variable. Sets ip-address,
+ * request-time and other values.
+ *
+ * @param array $ctx
+ * @return array
+ */
+ protected function configureServer(array $ctx): array
+ {
+ $server = $_SERVER;
+ $server['REQUEST_TIME'] = time();
+ $server['REQUEST_TIME_FLOAT'] = microtime(true);
+ $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1';
+
+ return $server;
+ }
+
/**
* Wraps all uploaded files with UploadedFile.
*
@@ -119,18 +138,18 @@ class PSR7Client
}
$result = [];
- foreach ($files as $index => $file) {
- if (!isset($file['name'])) {
- $result[$index] = $this->wrapUploads($file);
+ foreach ($files as $index => $f) {
+ if (!isset($f['name'])) {
+ $result[$index] = $this->wrapUploads($f);
continue;
}
$result[$index] = new Diactoros\UploadedFile(
- $file['tmpName'],
- $file['size'],
- $file['error'],
- $file['name'],
- $file['mime']
+ $f['tmpName'],
+ $f['size'],
+ $f['error'],
+ $f['name'],
+ $f['mime']
);
}
diff --git a/service/container.go b/service/container.go
index 0987b1ae..436d2e5f 100644
--- a/service/container.go
+++ b/service/container.go
@@ -4,23 +4,32 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
+ "reflect"
"sync"
)
-// Config provides ability to slice configuration sections and unmarshal configuration data into
-// given structure.
-type Config interface {
- // Get nested config section (sub-map), returns nil if section not found.
- Get(service string) Config
+var noConfig = fmt.Errorf("no config has been provided")
- // Unmarshal unmarshal config data into given struct.
- Unmarshal(out interface{}) error
+// InitMethod contains name of the method to be automatically invoked while service initialization. Must return
+// (bool, error). Container can be requested as well. Config can be requested in a form
+// of service.Config or pointer to service specific config struct (automatically unmarshalled), config argument must
+// implement service.HydrateConfig.
+const InitMethod = "Init"
+
+// Service can serve. Service can provide Init method which must return (bool, error) signature and might accept
+// other services and/or configs as dependency.
+type Service interface {
+ // Serve serves.
+ Serve() error
+
+ // Stop stops the service.
+ Stop()
}
// Container controls all internal RR services and provides plugin based system.
type Container interface {
// Register add new service to the container under given name.
- Register(name string, service Service)
+ Register(name string, service interface{})
// Reconfigure configures all underlying services with given configuration.
Init(cfg Config) error
@@ -28,9 +37,9 @@ type Container interface {
// Check if svc has been registered.
Has(service string) bool
- // Get returns svc instance by it's name or nil if svc not found. Method returns current service status
+ // get returns svc instance by it's name or nil if svc not found. Method returns current service status
// as second value.
- Get(service string) (svc Service, status int)
+ Get(service string) (svc interface{}, status int)
// Serve all configured services. Non blocking.
Serve() error
@@ -39,6 +48,24 @@ type Container interface {
Stop()
}
+// Config provides ability to slice configuration sections and unmarshal configuration data into
+// given structure.
+type Config interface {
+ // get nested config section (sub-map), returns nil if section not found.
+ Get(service string) Config
+
+ // Unmarshal unmarshal config data into given struct.
+ Unmarshal(out interface{}) error
+}
+
+// HydrateConfig provides ability to automatically hydrate config with values using
+// service.Config as the source.
+type HydrateConfig interface {
+ // Hydrate must populate config values using given config source.
+ // Must return error if config is not valid.
+ Hydrate(cfg Config) error
+}
+
type container struct {
log logrus.FieldLogger
mu sync.Mutex
@@ -54,7 +81,7 @@ func NewContainer(log logrus.FieldLogger) Container {
}
// Register add new service to the container under given name.
-func (c *container) Register(name string, service Service) {
+func (c *container) Register(name string, service interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -81,8 +108,8 @@ func (c *container) Has(target string) bool {
return false
}
-// Get returns svc instance by it's name or nil if svc not found.
-func (c *container) Get(target string) (svc Service, status int) {
+// get returns svc instance by it's name or nil if svc not found.
+func (c *container) Get(target string) (svc interface{}, status int) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -98,21 +125,24 @@ func (c *container) Get(target string) (svc Service, status int) {
// Init configures all underlying services with given configuration.
func (c *container) Init(cfg Config) error {
for _, e := range c.services {
- if e.getStatus() >= StatusConfigured {
+ if e.getStatus() >= StatusOK {
return fmt.Errorf("service [%s] has already been configured", e.name)
}
- segment := cfg.Get(e.name)
- if segment == nil {
- c.log.Debugf("[%s]: no config has been provided", e.name)
- continue
- }
+ // inject service dependencies
+ if ok, err := c.initService(e.svc, cfg.Get(e.name)); err != nil {
+ // soft error (skipping)
+ if err == noConfig {
+ c.log.Warningf("[%s]: no config has been provided", e.name)
+ continue
+ }
- ok, err := e.svc.Init(segment, c)
- if err != nil {
return errors.Wrap(err, fmt.Sprintf("[%s]", e.name))
} else if ok {
- e.setStatus(StatusConfigured)
+ e.setStatus(StatusOK)
+ c.log.Debugf("[%s]: initiated", e.name)
+ } else {
+ c.log.Debugf("[%s]: disabled", e.name)
}
}
@@ -127,18 +157,18 @@ func (c *container) Serve() error {
)
for _, e := range c.services {
- if e.hasStatus(StatusConfigured) {
+ if e.hasStatus(StatusOK) && e.canServe() {
numServing++
} else {
continue
}
- c.log.Debugf("[%s]: started", e.name)
+ c.log.Debugf("[%s]: service started", e.name)
go func(e *entry) {
e.setStatus(StatusServing)
defer e.setStatus(StatusStopped)
- if err := e.svc.Serve(); err != nil {
+ if err := e.svc.(Service).Serve(); err != nil {
c.log.Errorf("[%s]: %s", e.name, err)
done <- errors.Wrap(err, fmt.Sprintf("[%s]", e.name))
} else {
@@ -167,11 +197,114 @@ func (c *container) Serve() error {
// Stop sends stop command to all running services.
func (c *container) Stop() {
+ c.log.Debugf("received stop command")
+
for _, e := range c.services {
if e.hasStatus(StatusServing) {
- e.svc.Stop()
+ e.svc.(Service).Stop()
e.setStatus(StatusStopped)
+
c.log.Debugf("[%s]: stopped", e.name)
}
}
}
+
+// calls Init method with automatically resolved arguments.
+func (c *container) initService(s interface{}, segment Config) (bool, error) {
+ r := reflect.TypeOf(s)
+
+ m, ok := r.MethodByName("Init")
+ if !ok {
+ // no Init method is presented, assuming service does not need initialization.
+ return true, nil
+ }
+
+ if err := c.verifySignature(m); err != nil {
+ return false, err
+ }
+
+ // hydrating
+ values, err := c.resolveValues(s, m, segment)
+ if err != nil {
+ return false, err
+ }
+
+ // initiating service
+ out := m.Func.Call(values)
+
+ if out[1].IsNil() {
+ return out[0].Bool(), nil
+ }
+
+ return out[0].Bool(), out[1].Interface().(error)
+}
+
+// resolveValues returns slice of call arguments for service Init method.
+func (c *container) resolveValues(s interface{}, m reflect.Method, cfg Config) (values []reflect.Value, err error) {
+ for i := 0; i < m.Type.NumIn(); i++ {
+ v := m.Type.In(i)
+
+ switch {
+ case v.ConvertibleTo(reflect.ValueOf(s).Type()): // service itself
+ values = append(values, reflect.ValueOf(s))
+
+ case v.Implements(reflect.TypeOf((*Container)(nil)).Elem()): // container
+ values = append(values, reflect.ValueOf(c))
+
+ case v.Implements(reflect.TypeOf((*HydrateConfig)(nil)).Elem()): // injectable config
+ if cfg == nil {
+ return nil, noConfig
+ }
+
+ sc := reflect.New(v.Elem())
+ if err := sc.Interface().(HydrateConfig).Hydrate(cfg); err != nil {
+ return nil, err
+ }
+
+ values = append(values, sc)
+
+ case v.Implements(reflect.TypeOf((*Config)(nil)).Elem()): // generic config section
+ if cfg == nil {
+ return nil, noConfig
+ }
+
+ values = append(values, reflect.ValueOf(cfg))
+
+ default: // dependency on other service (resolution to nil if service can't be found)
+ found := false
+ for _, e := range c.services {
+ if !e.hasStatus(StatusOK) || !v.ConvertibleTo(reflect.ValueOf(e.svc).Type()) {
+ continue
+ }
+
+ found = true
+ values = append(values, reflect.ValueOf(e.svc))
+ break
+ }
+
+ if !found {
+ // placeholder (make sure to check inside the method)
+ values = append(values, reflect.New(v).Elem())
+ }
+ }
+ }
+
+ return
+}
+
+// verifySignature checks if Init method has valid signature
+func (c *container) verifySignature(m reflect.Method) error {
+ if m.Type.NumOut() != 2 {
+ return fmt.Errorf("method Init must have exact 2 return values")
+ }
+
+ if m.Type.Out(0).Kind() != reflect.Bool {
+ return fmt.Errorf("first return value of Init method must be bool type")
+ }
+
+ if !m.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
+ return fmt.Errorf("second return value of Init method value must be error type")
+ }
+
+ return nil
+}
diff --git a/service/container_test.go b/service/container_test.go
index 3092be78..8cb97c74 100644
--- a/service/container_test.go
+++ b/service/container_test.go
@@ -66,7 +66,7 @@ func (t *testService) setChan(c chan interface{}) {
type testCfg struct{ cfg string }
func (cfg *testCfg) Get(name string) Config {
- vars := make(map[string]string)
+ vars := make(map[string]interface{})
json.Unmarshal([]byte(cfg.cfg), &vars)
v, ok := vars[name]
@@ -74,7 +74,8 @@ func (cfg *testCfg) Get(name string) Config {
return nil
}
- return &testCfg{cfg: v}
+ d, _ := json.Marshal(v)
+ return &testCfg{cfg: string(d)}
}
func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
@@ -144,7 +145,7 @@ func TestContainer_Configure(t *testing.T) {
s, st := c.Get("test")
assert.IsType(t, &testService{}, s)
- assert.Equal(t, StatusConfigured, st)
+ assert.Equal(t, StatusOK, st)
}
func TestContainer_ConfigureNull(t *testing.T) {
@@ -176,7 +177,7 @@ func TestContainer_ConfigureDisabled(t *testing.T) {
assert.Equal(t, 1, len(hook.Entries))
assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`}))
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 2, len(hook.Entries))
s, st := c.Get("test")
assert.IsType(t, &testService{}, s)
@@ -325,3 +326,106 @@ func TestContainer_ServeErrorMultiple(t *testing.T) {
assert.IsType(t, &testService{}, s)
assert.Equal(t, StatusStopped, st)
}
+
+type testInitA struct{}
+
+func (t *testInitA) Init() error {
+ return nil
+}
+
+type testInitB struct{}
+
+func (t *testInitB) Init() (int, error) {
+ return 0, nil
+}
+
+func TestContainer_InitErrorA(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testInitA{})
+
+ assert.Error(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`}))
+}
+
+func TestContainer_InitErrorB(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testInitB{})
+
+ assert.Error(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`}))
+}
+
+type testInitC struct{}
+
+func TestContainer_NoInit(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testInitC{})
+
+ assert.NoError(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`}))
+}
+
+type testInitD struct {
+ c *testInitC
+}
+
+type DCfg struct {
+ V string
+}
+
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *DCfg) Hydrate(cfg Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ if c.V == "fail" {
+ return errors.New("failed config")
+ }
+
+ return nil
+}
+
+func (t *testInitD) Init(r *testInitC, c Container, cfg *DCfg) (bool, error) {
+ if r == nil {
+ return false, errors.New("unable to find testInitC")
+ }
+
+ if c == nil {
+ return false, errors.New("unable to find Container")
+ }
+
+ if cfg.V != "ok" {
+ return false, errors.New("invalid config")
+ }
+
+ return false, nil
+}
+
+func TestContainer_InitDependency(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testInitC{})
+ c.Register("test2", &testInitD{})
+
+ assert.NoError(t, c.Init(&testCfg{`{"test":"something", "test2":{"v":"ok"}}`}))
+}
+
+func TestContainer_InitDependencyFail(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := NewContainer(logger)
+ c.Register("test", &testInitC{})
+ c.Register("test2", &testInitD{})
+
+ assert.Error(t, c.Init(&testCfg{`{"test":"something", "test2":{"v":"fail"}}`}))
+}
diff --git a/service/service.go b/service/entry.go
index 6cd12b51..f2cbac28 100644
--- a/service/service.go
+++ b/service/entry.go
@@ -1,19 +1,8 @@
package service
-import "sync"
-
-// Service provides high level functionality for road runner modules.
-type Service interface {
- // Init must return configure service and return true if service hasStatus enabled. Must return error in case of
- // misconfiguration. Services must not be used without proper configuration pushed first.
- Init(cfg Config, c Container) (enabled bool, err error)
-
- // Serve serves.
- Serve() error
-
- // Stop stops the service.
- Stop()
-}
+import (
+ "sync"
+)
const (
// StatusUndefined when service bus can not find the service.
@@ -22,8 +11,8 @@ const (
// StatusRegistered hasStatus setStatus when service has been registered in container.
StatusRegistered
- // StatusConfigured hasStatus setStatus when service has been properly configured.
- StatusConfigured
+ // StatusOK hasStatus setStatus when service has been properly configured.
+ StatusOK
// StatusServing hasStatus setStatus when service hasStatus currently done.
StatusServing
@@ -35,7 +24,7 @@ const (
// entry creates association between service instance and given name.
type entry struct {
name string
- svc Service
+ svc interface{}
mu sync.Mutex
status int
}
@@ -59,3 +48,9 @@ func (e *entry) setStatus(status int) {
func (e *entry) hasStatus(status int) bool {
return e.getStatus() == status
}
+
+// canServe returns true is service can serve.
+func (e *entry) canServe() bool {
+ _, ok := e.svc.(Service)
+ return ok
+}
diff --git a/service/entry_test.go b/service/entry_test.go
new file mode 100644
index 00000000..b5c71a10
--- /dev/null
+++ b/service/entry_test.go
@@ -0,0 +1,16 @@
+package service
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestEntry_CanServeFalse(t *testing.T) {
+ e := &entry{svc: nil}
+ assert.False(t, e.canServe())
+}
+
+func TestEntry_CanServeTrue(t *testing.T) {
+ e := &entry{svc: &testService{}}
+ assert.True(t, e.canServe())
+}
diff --git a/service/http/attributes.go b/service/http/attributes.go
deleted file mode 100644
index acea38a1..00000000
--- a/service/http/attributes.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package http
-
-import (
- "context"
- "net/http"
- "errors"
-)
-
-const contextKey = "psr:attributes"
-
-type attrs map[string]interface{}
-
-// InitAttributes returns request with new context and attribute bag.
-func InitAttributes(r *http.Request) *http.Request {
- return r.WithContext(context.WithValue(r.Context(), contextKey, attrs{}))
-}
-
-// AllAttributes returns all context attributes.
-func AllAttributes(r *http.Request) map[string]interface{} {
- v := r.Context().Value(contextKey)
- if v == nil {
- return attrs{}
- }
-
- return v.(attrs)
-}
-
-// Get gets the value from request context. It replaces any existing
-// values.
-func GetAttribute(r *http.Request, key string) interface{} {
- v := r.Context().Value(contextKey)
- if v == nil {
- return nil
- }
-
- return v.(attrs).Get(key)
-}
-
-// Set sets the key to value. It replaces any existing
-// values. Context specific.
-func SetAttribute(r *http.Request, key string, value interface{}) error {
- v := r.Context().Value(contextKey)
- if v == nil {
- return errors.New("unable to find psr:attributes context value")
- }
-
- v.(attrs).Set(key, value)
- return nil
-}
-
-// Get gets the value associated with the given key.
-func (v attrs) Get(key string) interface{} {
- if v == nil {
- return ""
- }
-
- return v[key]
-}
-
-// Set sets the key to value. It replaces any existing
-// values.
-func (v attrs) Set(key string, value interface{}) {
- v[key] = value
-}
-
-// Del deletes the value associated with key.
-func (v attrs) Del(key string) {
- delete(v, key)
-}
diff --git a/service/http/attributes/attributes.go b/service/http/attributes/attributes.go
new file mode 100644
index 00000000..94d0e9c1
--- /dev/null
+++ b/service/http/attributes/attributes.go
@@ -0,0 +1,74 @@
+package attributes
+
+import (
+ "context"
+ "errors"
+ "net/http"
+)
+
+const contextKey = "psr:attributes"
+
+type attrs map[string]interface{}
+
+func (v attrs) get(key string) interface{} {
+ if v == nil {
+ return ""
+ }
+
+ return v[key]
+}
+
+func (v attrs) set(key string, value interface{}) {
+ v[key] = value
+}
+
+func (v attrs) del(key string) {
+ delete(v, key)
+}
+
+// Init returns request with new context and attribute bag.
+func Init(r *http.Request) *http.Request {
+ return r.WithContext(context.WithValue(r.Context(), contextKey, attrs{}))
+}
+
+// All returns all context attributes.
+func All(r *http.Request) map[string]interface{} {
+ v := r.Context().Value(contextKey)
+ if v == nil {
+ return attrs{}
+ }
+
+ return v.(attrs)
+}
+
+// get gets the value from request context. It replaces any existing
+// values.
+func Get(r *http.Request, key string) interface{} {
+ v := r.Context().Value(contextKey)
+ if v == nil {
+ return nil
+ }
+
+ return v.(attrs).get(key)
+}
+
+// set sets the key to value. It replaces any existing
+// values. Context specific.
+func Set(r *http.Request, key string, value interface{}) error {
+ v := r.Context().Value(contextKey)
+ if v == nil {
+ return errors.New("unable to find `psr:attributes` context key")
+ }
+
+ v.(attrs).set(key, value)
+ return nil
+}
+
+// Delete deletes values associated with attribute key.
+func (v attrs) Delete(key string) {
+ if v == nil {
+ return
+ }
+
+ v.del(key)
+}
diff --git a/service/http/attributes/attributes_test.go b/service/http/attributes/attributes_test.go
new file mode 100644
index 00000000..a71d6542
--- /dev/null
+++ b/service/http/attributes/attributes_test.go
@@ -0,0 +1,67 @@
+package attributes
+
+import (
+ "github.com/stretchr/testify/assert"
+ "net/http"
+ "testing"
+)
+
+func TestAllAttributes(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ Set(r, "key", "value")
+
+ assert.Equal(t, All(r), map[string]interface{}{
+ "key": "value",
+ })
+}
+
+func TestAllAttributesNone(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ assert.Equal(t, All(r), map[string]interface{}{})
+}
+
+func TestAllAttributesNone2(t *testing.T) {
+ r := &http.Request{}
+
+ assert.Equal(t, All(r), map[string]interface{}{})
+}
+
+func TestGetAttribute(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ Set(r, "key", "value")
+ assert.Equal(t, Get(r, "key"), "value")
+}
+
+func TestGetAttributeNone(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ assert.Equal(t, Get(r, "key"), nil)
+}
+
+func TestGetAttributeNone2(t *testing.T) {
+ r := &http.Request{}
+
+ assert.Equal(t, Get(r, "key"), nil)
+}
+
+func TestSetAttribute(t *testing.T) {
+ r := &http.Request{}
+ r = Init(r)
+
+ Set(r, "key", "value")
+ assert.Equal(t, Get(r, "key"), "value")
+}
+
+func TestSetAttributeNone(t *testing.T) {
+ r := &http.Request{}
+
+ Set(r, "key", "value")
+ assert.Equal(t, Get(r, "key"), nil)
+}
diff --git a/service/http/attributes_test.go b/service/http/attributes_test.go
deleted file mode 100644
index aeb7fe74..00000000
--- a/service/http/attributes_test.go
+++ /dev/null
@@ -1,67 +0,0 @@
-package http
-
-import (
- "testing"
- "net/http"
- "github.com/stretchr/testify/assert"
-)
-
-func TestAllAttributes(t *testing.T) {
- r := &http.Request{}
- r = InitAttributes(r)
-
- SetAttribute(r, "key", "value")
-
- assert.Equal(t, AllAttributes(r), map[string]interface{}{
- "key": "value",
- })
-}
-
-func TestAllAttributesNone(t *testing.T) {
- r := &http.Request{}
- r = InitAttributes(r)
-
- assert.Equal(t, AllAttributes(r), map[string]interface{}{})
-}
-
-func TestAllAttributesNone2(t *testing.T) {
- r := &http.Request{}
-
- assert.Equal(t, AllAttributes(r), map[string]interface{}{})
-}
-
-func TestGetAttribute(t *testing.T) {
- r := &http.Request{}
- r = InitAttributes(r)
-
- SetAttribute(r, "key", "value")
- assert.Equal(t, GetAttribute(r, "key"), "value")
-}
-
-func TestGetAttributeNone(t *testing.T) {
- r := &http.Request{}
- r = InitAttributes(r)
-
- assert.Equal(t, GetAttribute(r, "key"), nil)
-}
-
-func TestGetAttributeNone2(t *testing.T) {
- r := &http.Request{}
-
- assert.Equal(t, GetAttribute(r, "key"), nil)
-}
-
-func TestSetAttribute(t *testing.T) {
- r := &http.Request{}
- r = InitAttributes(r)
-
- SetAttribute(r, "key", "value")
- assert.Equal(t, GetAttribute(r, "key"), "value")
-}
-
-func TestSetAttributeNone(t *testing.T) {
- r := &http.Request{}
-
- SetAttribute(r, "key", "value")
- assert.Equal(t, GetAttribute(r, "key"), nil)
-} \ No newline at end of file
diff --git a/service/http/config.go b/service/http/config.go
index 19a2e71d..20a247fb 100644
--- a/service/http/config.go
+++ b/service/http/config.go
@@ -3,7 +3,9 @@ package http
import (
"errors"
"github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
"strings"
+ "time"
)
// Config configures RoadRunner HTTP server.
@@ -24,25 +26,54 @@ type Config struct {
Workers *roadrunner.ServerConfig
}
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ if err := c.Valid(); err != nil {
+ return err
+ }
+
+ if c.Workers.Relay == "" {
+ c.Workers.Relay = "pipes"
+ }
+
+ if c.Workers.RelayTimeout < time.Microsecond {
+ c.Workers.RelayTimeout = time.Second * time.Duration(c.Workers.RelayTimeout.Nanoseconds())
+ }
+
+ if c.Workers.Pool.AllocateTimeout < time.Microsecond {
+ c.Workers.Pool.AllocateTimeout = time.Second * time.Duration(c.Workers.Pool.AllocateTimeout.Nanoseconds())
+ }
+
+ if c.Workers.Pool.DestroyTimeout < time.Microsecond {
+ c.Workers.Pool.DestroyTimeout = time.Second * time.Duration(c.Workers.Pool.DestroyTimeout.Nanoseconds())
+ }
+
+ return nil
+}
+
// Valid validates the configuration.
-func (cfg *Config) Valid() error {
- if cfg.Uploads == nil {
+func (c *Config) Valid() error {
+ if c.Uploads == nil {
return errors.New("mailformed uploads config")
}
- if cfg.Workers == nil {
+ if c.Workers == nil {
return errors.New("mailformed workers config")
}
- if cfg.Workers.Pool == nil {
+ if c.Workers.Pool == nil {
return errors.New("mailformed workers config (pool config is missing)")
}
- if err := cfg.Workers.Pool.Valid(); err != nil {
+ if err := c.Workers.Pool.Valid(); err != nil {
return err
}
- if !strings.Contains(cfg.Address, ":") {
+ if !strings.Contains(c.Address, ":") {
return errors.New("mailformed server address")
}
diff --git a/service/http/config_test.go b/service/http/config_test.go
index cb804f4a..2e3fe731 100644
--- a/service/http/config_test.go
+++ b/service/http/config_test.go
@@ -1,13 +1,34 @@
package http
import (
+ "encoding/json"
"github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service"
"github.com/stretchr/testify/assert"
"os"
"testing"
"time"
)
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_Config_Hydrate_Error1(t *testing.T) {
+ cfg := &mockCfg{`{"enable": true}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error2(t *testing.T) {
+ cfg := &mockCfg{`{"dir": "/dir/"`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
func Test_Config_Valid(t *testing.T) {
cfg := &Config{
Enable: true,
diff --git a/service/http/handler.go b/service/http/handler.go
index 6f2617b1..9e67e5b4 100644
--- a/service/http/handler.go
+++ b/service/http/handler.go
@@ -9,26 +9,29 @@ import (
)
const (
- // EventResponse thrown after the request been processed. See Event as payload.
+ // EventResponse thrown after the request been processed. See ErrorEvent as payload.
EventResponse = iota + 500
// EventError thrown on any non job error provided by road runner server.
EventError
)
-// Event represents singular http response event.
-type Event struct {
- // Method of the request.
- Method string
+// ErrorEvent represents singular http error event.
+type ErrorEvent struct {
+ // Request contains client request, must not be stored.
+ Request *http.Request
- // URI requested by the client.
- URI string
+ // Error - associated error, if any.
+ Error error
+}
- // Status is response status.
- Status int
+// ResponseEvent represents singular http response event.
+type ResponseEvent struct {
+ // Request contains client request, must not be stored.
+ Request *Request
- // Associated error, if any.
- Error error
+ // Response contains service response.
+ Response *Response
}
// Handler serves http connections to underlying PHP application using PSR-7 protocol. Context will include request headers,
@@ -99,7 +102,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// handleError sends error.
func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error) {
- h.throw(EventError, &Event{Method: r.Method, URI: uri(r), Status: 500, Error: err})
+ h.throw(EventError, &ErrorEvent{Request: r, Error: err})
w.WriteHeader(500)
w.Write([]byte(err.Error()))
@@ -107,7 +110,7 @@ func (h *Handler) handleError(w http.ResponseWriter, r *http.Request, err error)
// handleResponse triggers response event.
func (h *Handler) handleResponse(req *Request, resp *Response) {
- h.throw(EventResponse, &Event{Method: req.Method, URI: req.URI, Status: resp.Status})
+ h.throw(EventResponse, &ResponseEvent{Request: req, Response: resp})
}
// throw invokes event srv if any.
diff --git a/service/http/request.go b/service/http/request.go
index 21566416..6d5cc126 100644
--- a/service/http/request.go
+++ b/service/http/request.go
@@ -4,7 +4,9 @@ import (
"encoding/json"
"fmt"
"github.com/spiral/roadrunner"
+ "github.com/spiral/roadrunner/service/http/attributes"
"io/ioutil"
+ "net"
"net/http"
"net/url"
"strings"
@@ -20,6 +22,9 @@ const (
// Request maps net/http requests to PSR7 compatible structure and managed state of temporary uploaded files.
type Request struct {
+ // RemoteAddr contains ip address of client, make sure to check X-Real-Ip and X-Forwarded-For for real client address.
+ RemoteAddr string `json:"remoteAddr"`
+
// Protocol includes HTTP protocol version.
Protocol string `json:"protocol"`
@@ -60,7 +65,14 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) {
Headers: r.Header,
Cookies: make(map[string]string),
RawQuery: r.URL.RawQuery,
- Attributes: AllAttributes(r),
+ Attributes: attributes.All(r),
+ }
+
+ // otherwise, return remote address as is
+ if strings.ContainsRune(r.RemoteAddr, ':') {
+ req.RemoteAddr, _, _ = net.SplitHostPort(r.RemoteAddr)
+ } else {
+ req.RemoteAddr = r.RemoteAddr
}
for _, c := range r.Cookies() {
diff --git a/service/http/service.go b/service/http/service.go
index 710cd60c..f7fdf2ab 100644
--- a/service/http/service.go
+++ b/service/http/service.go
@@ -3,7 +3,7 @@ package http
import (
"context"
"github.com/spiral/roadrunner"
- "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/service/http/attributes"
"github.com/spiral/roadrunner/service/rpc"
"net/http"
"sync"
@@ -41,28 +41,14 @@ func (s *Service) AddListener(l func(event int, ctx interface{})) {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Service) Init(cfg service.Config, c service.Container) (bool, error) {
- config := &Config{}
-
- if err := cfg.Unmarshal(config); err != nil {
- return false, err
- }
-
- if !config.Enable {
+func (s *Service) Init(cfg *Config, r *rpc.Service) (bool, error) {
+ if !cfg.Enable {
return false, nil
}
- if err := config.Valid(); err != nil {
- return false, err
- }
-
- s.cfg = config
-
- // registering http RPC interface
- if r, ok := c.Get(rpc.ID); ok >= service.StatusConfigured {
- if h, ok := r.(*rpc.Service); ok {
- h.Register(ID, &rpcServer{s})
- }
+ s.cfg = cfg
+ if r != nil {
+ r.Register(ID, &rpcServer{s})
}
return true, nil
@@ -113,16 +99,17 @@ func (s *Service) Stop() {
// middleware handles connection using set of mdws and rr PSR-7 server.
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- r = InitAttributes(r)
+ r = attributes.Init(r)
+ // chaining middlewares
f := s.srv.ServeHTTP
for _, m := range s.mdws {
f = m(f)
}
-
f(w, r)
}
+// listener handles service, server and pool events.
func (s *Service) listener(event int, ctx interface{}) {
for _, l := range s.lsns {
l(event, ctx)
diff --git a/service/http/service_test.go b/service/http/service_test.go
index 50836b4b..b442ae51 100644
--- a/service/http/service_test.go
+++ b/service/http/service_test.go
@@ -42,7 +42,7 @@ func Test_Service_NoConfig(t *testing.T) {
c := service.NewContainer(logger)
c.Register(ID, &Service{})
- assert.NoError(t, c.Init(&testCfg{httpCfg: `{}`}))
+ assert.Error(t, c.Init(&testCfg{httpCfg: `{}`}))
s, st := c.Get(ID)
assert.NotNil(t, s)
@@ -108,7 +108,7 @@ func Test_Service_Configure_Enable(t *testing.T) {
s, st := c.Get(ID)
assert.NotNil(t, s)
- assert.Equal(t, service.StatusConfigured, st)
+ assert.Equal(t, service.StatusOK, st)
}
func Test_Service_Echo(t *testing.T) {
@@ -139,10 +139,10 @@ func Test_Service_Echo(t *testing.T) {
s, st := c.Get(ID)
assert.NotNil(t, s)
- assert.Equal(t, service.StatusConfigured, st)
+ assert.Equal(t, service.StatusOK, st)
// should do nothing
- s.Stop()
+ s.(*Service).Stop()
go func() { c.Serve() }()
time.Sleep(time.Millisecond * 100)
@@ -191,7 +191,7 @@ func Test_Service_ErrorEcho(t *testing.T) {
s, st := c.Get(ID)
assert.NotNil(t, s)
- assert.Equal(t, service.StatusConfigured, st)
+ assert.Equal(t, service.StatusOK, st)
goterr := make(chan interface{})
s.(*Service).AddListener(func(event int, ctx interface{}) {
@@ -251,7 +251,7 @@ func Test_Service_Middleware(t *testing.T) {
s, st := c.Get(ID)
assert.NotNil(t, s)
- assert.Equal(t, service.StatusConfigured, st)
+ assert.Equal(t, service.StatusOK, st)
s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
@@ -325,7 +325,7 @@ func Test_Service_Listener(t *testing.T) {
s, st := c.Get(ID)
assert.NotNil(t, s)
- assert.Equal(t, service.StatusConfigured, st)
+ assert.Equal(t, service.StatusOK, st)
stop := make(chan interface{})
s.(*Service).AddListener(func(event int, ctx interface{}) {
diff --git a/service/rpc/config.go b/service/rpc/config.go
index e3168945..c37b0853 100644
--- a/service/rpc/config.go
+++ b/service/rpc/config.go
@@ -2,12 +2,14 @@ package rpc
import (
"errors"
+ "github.com/spiral/roadrunner/service"
"net"
"strings"
"syscall"
)
-type config struct {
+// Config defines RPC service config.
+type Config struct {
// Indicates if RPC connection is enabled.
Enable bool
@@ -15,9 +17,31 @@ type config struct {
Listen string
}
-// listener creates new rpc socket listener.
-func (cfg *config) listener() (net.Listener, error) {
- dsn := strings.Split(cfg.Listen, "://")
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+
+ return c.Valid()
+}
+
+// Valid returns nil if config is valid.
+func (c *Config) Valid() error {
+ if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 {
+ return errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
+ }
+
+ if dsn := strings.Split(c.Listen, "://"); len(dsn) != 2 {
+ return errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
+ }
+
+ return nil
+}
+
+// Listener creates new rpc socket Listener.
+func (c *Config) Listener() (net.Listener, error) {
+ dsn := strings.Split(c.Listen, "://")
if len(dsn) != 2 {
return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
}
@@ -29,9 +53,9 @@ func (cfg *config) listener() (net.Listener, error) {
return net.Listen(dsn[0], dsn[1])
}
-// dialer creates rpc socket dialer.
-func (cfg *config) dialer() (net.Conn, error) {
- dsn := strings.Split(cfg.Listen, "://")
+// Dialer creates rpc socket Dialer.
+func (c *Config) Dialer() (net.Conn, error) {
+ dsn := strings.Split(c.Listen, "://")
if len(dsn) != 2 {
return nil, errors.New("invalid socket DSN (tcp://:6001, unix://rpc.sock)")
}
diff --git a/service/rpc/config_test.go b/service/rpc/config_test.go
index a953e30e..a7c51c0f 100644
--- a/service/rpc/config_test.go
+++ b/service/rpc/config_test.go
@@ -1,15 +1,36 @@
package rpc
import (
+ "encoding/json"
+ "github.com/spiral/roadrunner/service"
"github.com/stretchr/testify/assert"
"runtime"
"testing"
)
+type testCfg struct{ cfg string }
+
+func (cfg *testCfg) Get(name string) service.Config { return nil }
+func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_Config_Hydrate(t *testing.T) {
+ cfg := &testCfg{`{"enable": true, "listen": "tcp://:18001"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error(t *testing.T) {
+ cfg := &testCfg{`{"enable": true, "listen": "invalid"}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
func TestConfig_Listener(t *testing.T) {
- cfg := &config{Listen: "tcp://:18001"}
+ cfg := &Config{Listen: "tcp://:18001"}
- ln, err := cfg.listener()
+ ln, err := cfg.Listener()
assert.NoError(t, err)
assert.NotNil(t, ln)
defer ln.Close()
@@ -23,9 +44,9 @@ func TestConfig_ListenerUnix(t *testing.T) {
t.Skip("not supported on " + runtime.GOOS)
}
- cfg := &config{Listen: "unix://rpc.sock"}
+ cfg := &Config{Listen: "unix://rpc.sock"}
- ln, err := cfg.listener()
+ ln, err := cfg.Listener()
assert.NoError(t, err)
assert.NotNil(t, ln)
defer ln.Close()
@@ -39,28 +60,28 @@ func Test_Config_Error(t *testing.T) {
t.Skip("not supported on " + runtime.GOOS)
}
- cfg := &config{Listen: "uni:unix.sock"}
- ln, err := cfg.listener()
+ cfg := &Config{Listen: "uni:unix.sock"}
+ ln, err := cfg.Listener()
assert.Nil(t, ln)
assert.Error(t, err)
assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
}
func Test_Config_ErrorMethod(t *testing.T) {
- cfg := &config{Listen: "xinu://unix.sock"}
+ cfg := &Config{Listen: "xinu://unix.sock"}
- ln, err := cfg.listener()
+ ln, err := cfg.Listener()
assert.Nil(t, ln)
assert.Error(t, err)
}
func TestConfig_Dialer(t *testing.T) {
- cfg := &config{Listen: "tcp://:18001"}
+ cfg := &Config{Listen: "tcp://:18001"}
- ln, err := cfg.listener()
+ ln, err := cfg.Listener()
defer ln.Close()
- conn, err := cfg.dialer()
+ conn, err := cfg.Dialer()
assert.NoError(t, err)
assert.NotNil(t, conn)
defer conn.Close()
@@ -74,12 +95,12 @@ func TestConfig_DialerUnix(t *testing.T) {
t.Skip("not supported on " + runtime.GOOS)
}
- cfg := &config{Listen: "unix://rpc.sock"}
+ cfg := &Config{Listen: "unix://rpc.sock"}
- ln, err := cfg.listener()
+ ln, err := cfg.Listener()
defer ln.Close()
- conn, err := cfg.dialer()
+ conn, err := cfg.Dialer()
assert.NoError(t, err)
assert.NotNil(t, conn)
defer conn.Close()
@@ -93,17 +114,17 @@ func Test_Config_DialerError(t *testing.T) {
t.Skip("not supported on " + runtime.GOOS)
}
- cfg := &config{Listen: "uni:unix.sock"}
- ln, err := cfg.dialer()
+ cfg := &Config{Listen: "uni:unix.sock"}
+ ln, err := cfg.Dialer()
assert.Nil(t, ln)
assert.Error(t, err)
assert.Equal(t, "invalid socket DSN (tcp://:6001, unix://rpc.sock)", err.Error())
}
func Test_Config_DialerErrorMethod(t *testing.T) {
- cfg := &config{Listen: "xinu://unix.sock"}
+ cfg := &Config{Listen: "xinu://unix.sock"}
- ln, err := cfg.dialer()
+ ln, err := cfg.Dialer()
assert.Nil(t, ln)
assert.Error(t, err)
}
diff --git a/service/rpc/service.go b/service/rpc/service.go
index 82f26407..6e231048 100644
--- a/service/rpc/service.go
+++ b/service/rpc/service.go
@@ -3,7 +3,6 @@ package rpc
import (
"errors"
"github.com/spiral/goridge"
- "github.com/spiral/roadrunner/service"
"net/rpc"
"sync"
)
@@ -13,27 +12,20 @@ const ID = "rpc"
// Service is RPC service.
type Service struct {
- cfg *config
- stop chan interface{}
- rpc *rpc.Server
-
+ cfg *Config
+ stop chan interface{}
+ rpc *rpc.Server
mu sync.Mutex
serving bool
}
-// Init must return configure service and return true if service hasStatus enabled. Must return error in case of
-// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Service) Init(cfg service.Config, reg service.Container) (enabled bool, err error) {
- config := &config{}
- if err := cfg.Unmarshal(config); err != nil {
- return false, err
- }
-
- if !config.Enable {
+// Init rpc service. Must return true if service is enabled.
+func (s *Service) Init(cfg *Config) (bool, error) {
+ if !cfg.Enable {
return false, nil
}
- s.cfg = config
+ s.cfg = cfg
s.rpc = rpc.NewServer()
return true, nil
@@ -50,7 +42,7 @@ func (s *Service) Serve() error {
s.stop = make(chan interface{})
s.mu.Unlock()
- ln, err := s.cfg.listener()
+ ln, err := s.cfg.Listener()
if err != nil {
return err
}
@@ -99,12 +91,12 @@ func (s *Service) Stop() {
// - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
-func (s *Service) Register(name string, rcvr interface{}) error {
+func (s *Service) Register(name string, svc interface{}) error {
if s.rpc == nil {
return errors.New("RPC service is not configured")
}
- return s.rpc.RegisterName(name, rcvr)
+ return s.rpc.RegisterName(name, svc)
}
// Client creates new RPC client.
@@ -113,7 +105,7 @@ func (s *Service) Client() (*rpc.Client, error) {
return nil, errors.New("RPC service is not configured")
}
- conn, err := s.cfg.dialer()
+ conn, err := s.cfg.Dialer()
if err != nil {
return nil, err
}
diff --git a/service/rpc/service_test.go b/service/rpc/service_test.go
index d4734bb5..fc88d38d 100644
--- a/service/rpc/service_test.go
+++ b/service/rpc/service_test.go
@@ -1,8 +1,6 @@
package rpc
import (
- "encoding/json"
- "github.com/spiral/roadrunner/service"
"github.com/stretchr/testify/assert"
"testing"
"time"
@@ -12,22 +10,9 @@ type testService struct{}
func (ts *testService) Echo(msg string, r *string) error { *r = msg; return nil }
-type testCfg struct{ cfg string }
-
-func (cfg *testCfg) Get(name string) service.Config { return nil }
-func (cfg *testCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func Test_ConfigError(t *testing.T) {
- s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":false`}, nil)
-
- assert.Error(t, err)
- assert.False(t, ok)
-}
-
func Test_Disabled(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":false}`}, nil)
+ ok, err := s.Init(&Config{Enable: false})
assert.NoError(t, err)
assert.False(t, ok)
@@ -45,7 +30,7 @@ func Test_RegisterNotConfigured(t *testing.T) {
func Test_Enabled(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"})
assert.NoError(t, err)
assert.True(t, ok)
@@ -53,7 +38,7 @@ func Test_Enabled(t *testing.T) {
func Test_StopNonServing(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9008"}`}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9008"})
assert.NoError(t, err)
assert.True(t, ok)
@@ -62,7 +47,7 @@ func Test_StopNonServing(t *testing.T) {
func Test_Serve_Errors(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":true, "listen":"mailformed"}`}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "mailformed"})
assert.NoError(t, err)
assert.True(t, ok)
@@ -75,7 +60,7 @@ func Test_Serve_Errors(t *testing.T) {
func Test_Serve_Client(t *testing.T) {
s := &Service{}
- ok, err := s.Init(&testCfg{`{"enable":true, "listen":"tcp://localhost:9018"}`}, nil)
+ ok, err := s.Init(&Config{Enable: true, Listen: "tcp://localhost:9018"})
assert.NoError(t, err)
assert.True(t, ok)
diff --git a/service/static/config.go b/service/static/config.go
index 1020b8cd..95fdbeee 100644
--- a/service/static/config.go
+++ b/service/static/config.go
@@ -2,6 +2,7 @@ package static
import (
"github.com/pkg/errors"
+ "github.com/spiral/roadrunner/service"
"os"
"path"
"strings"
@@ -20,22 +21,18 @@ type Config struct {
Forbid []string
}
-// Forbids must return true if file extension is not allowed for the upload.
-func (cfg *Config) Forbids(filename string) bool {
- ext := strings.ToLower(path.Ext(filename))
-
- for _, v := range cfg.Forbid {
- if ext == v {
- return true
- }
+// Hydrate must populate Config values using given Config source. Must return error if Config is not valid.
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
}
- return false
+ return c.Valid()
}
-// Valid validates existence of directory.
-func (cfg *Config) Valid() error {
- st, err := os.Stat(cfg.Dir)
+// Valid returns nil if config is valid.
+func (c *Config) Valid() error {
+ st, err := os.Stat(c.Dir)
if err != nil {
if os.IsNotExist(err) {
return errors.New("root directory does not exists")
@@ -50,3 +47,16 @@ func (cfg *Config) Valid() error {
return nil
}
+
+// Forbids must return true if file extension is not allowed for the upload.
+func (c *Config) Forbids(filename string) bool {
+ ext := strings.ToLower(path.Ext(filename))
+
+ for _, v := range c.Forbid {
+ if ext == v {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/service/static/config_test.go b/service/static/config_test.go
index d2099cdf..18168d59 100644
--- a/service/static/config_test.go
+++ b/service/static/config_test.go
@@ -1,10 +1,31 @@
package static
import (
+ "encoding/json"
+ "github.com/spiral/roadrunner/service"
"github.com/stretchr/testify/assert"
"testing"
)
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_Config_Hydrate(t *testing.T) {
+ cfg := &mockCfg{`{"dir": "./"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error(t *testing.T) {
+ cfg := &mockCfg{`{"dir": "/dir/"}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
func TestConfig_Forbids(t *testing.T) {
cfg := Config{Forbid: []string{".php"}}
diff --git a/service/static/service.go b/service/static/service.go
index add242e4..98d8313c 100644
--- a/service/static/service.go
+++ b/service/static/service.go
@@ -1,7 +1,6 @@
package static
import (
- "github.com/spiral/roadrunner/service"
rrttp "github.com/spiral/roadrunner/service/http"
"net/http"
"path"
@@ -22,39 +21,18 @@ type Service struct {
// Init must return configure service and return true if service hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Service) Init(cfg service.Config, c service.Container) (enabled bool, err error) {
- config := &Config{}
- if err := cfg.Unmarshal(config); err != nil {
- return false, err
- }
-
- if !config.Enable {
+func (s *Service) Init(cfg *Config, r *rrttp.Service) (bool, error) {
+ if !cfg.Enable || r == nil {
return false, nil
}
- if err := config.Valid(); err != nil {
- return false, err
- }
-
- s.cfg = config
+ s.cfg = cfg
s.root = http.Dir(s.cfg.Dir)
-
- // registering as middleware
- if h, ok := c.Get(rrttp.ID); ok >= service.StatusConfigured {
- if h, ok := h.(*rrttp.Service); ok {
- h.AddMiddleware(s.middleware)
- }
- }
+ r.AddMiddleware(s.middleware)
return true, nil
}
-// Serve serves the service.
-func (s *Service) Serve() error { return nil }
-
-// Stop stops the service.
-func (s *Service) Stop() {}
-
// middleware must return true if request/response pair is handled within the middleware.
func (s *Service) middleware(f http.HandlerFunc) http.HandlerFunc {
// Define the http.HandlerFunc