summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/rr/cmd/version.go2
-rw-r--r--error_buffer.go82
-rw-r--r--error_buffer_test.go20
-rw-r--r--php-src/tests/http/echoerr.php12
-rw-r--r--service/http/service_test.go60
-rw-r--r--static_pool.go2
-rw-r--r--worker.go2
7 files changed, 148 insertions, 32 deletions
diff --git a/cmd/rr/cmd/version.go b/cmd/rr/cmd/version.go
index 45631fbb..fb7a2f88 100644
--- a/cmd/rr/cmd/version.go
+++ b/cmd/rr/cmd/version.go
@@ -2,7 +2,7 @@ package cmd
var (
// Version - defines build version.
- Version = "1.0.0"
+ Version = "1.0.0"
// BuildTime - defined build time.
BuildTime = "development"
)
diff --git a/error_buffer.go b/error_buffer.go
index 78effc9b..8be9c5a8 100644
--- a/error_buffer.go
+++ b/error_buffer.go
@@ -1,24 +1,69 @@
package roadrunner
import (
- "bytes"
"sync"
+ "time"
)
-// EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte).
-const EventStderrOutput = 1900
+const (
+ // EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte).
+ EventStderrOutput = 1900
+
+ // WaitDuration - for how long error buffer should attempt to aggregate error messages before merging output
+ // together since lastError update (required to keep error update together).
+ WaitDuration = 100 * time.Millisecond
+)
// thread safe errBuffer
type errBuffer struct {
- mu sync.Mutex
- buf []byte
- off int
- lsn func(event int, ctx interface{})
+ mu sync.Mutex
+ buf []byte
+ last int
+ wait *time.Timer
+ update chan interface{}
+ stop chan interface{}
+ lsn func(event int, ctx interface{})
}
func newErrBuffer() *errBuffer {
- buf := &errBuffer{buf: make([]byte, 0)}
- return buf
+ eb := &errBuffer{
+ buf: make([]byte, 0),
+ update: make(chan interface{}),
+ wait: time.NewTimer(WaitDuration),
+ stop: make(chan interface{}),
+ }
+
+ go func() {
+ for {
+ select {
+ case <-eb.update:
+ eb.wait.Reset(WaitDuration)
+ case <-eb.wait.C:
+ eb.mu.Lock()
+ if len(eb.buf) > eb.last {
+ if eb.lsn != nil {
+ eb.lsn(EventStderrOutput, eb.buf[eb.last:])
+ }
+ eb.last = len(eb.buf)
+ }
+ eb.mu.Unlock()
+ case <-eb.stop:
+ eb.wait.Stop()
+
+ eb.mu.Lock()
+ if len(eb.buf) > eb.last {
+ if eb.lsn != nil {
+ eb.lsn(EventStderrOutput, eb.buf[eb.last:])
+ }
+ eb.last = len(eb.buf)
+ }
+ eb.mu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return eb
}
// Listen attaches error stream even listener.
@@ -36,7 +81,7 @@ func (eb *errBuffer) Len() int {
defer eb.mu.Unlock()
// currently active message
- return len(eb.buf) - eb.off
+ return len(eb.buf)
}
// Write appends the contents of p to the errBuffer, growing the errBuffer as
@@ -46,11 +91,7 @@ func (eb *errBuffer) Write(p []byte) (int, error) {
defer eb.mu.Unlock()
eb.buf = append(eb.buf, p...)
- for msg := eb.fetchMsg(); msg != nil; msg = eb.fetchMsg() {
- if eb.lsn != nil {
- eb.lsn(EventStderrOutput, msg)
- }
- }
+ eb.update <- nil
return len(p), nil
}
@@ -60,14 +101,11 @@ func (eb *errBuffer) String() string {
eb.mu.Lock()
defer eb.mu.Unlock()
- return string(eb.buf[eb.off:])
+ return string(eb.buf)
}
-func (eb *errBuffer) fetchMsg() []byte {
- if i := bytes.Index(eb.buf[eb.off:], []byte{10, 10}); i != -1 {
- eb.off += i + 2
- return eb.buf[eb.off-i-2 : eb.off]
- }
-
+// Close aggregation timer.
+func (eb *errBuffer) Close() error {
+ close(eb.stop)
return nil
}
diff --git a/error_buffer_test.go b/error_buffer_test.go
index 2bf98915..09ea4f03 100644
--- a/error_buffer_test.go
+++ b/error_buffer_test.go
@@ -7,6 +7,7 @@ import (
func TestErrBuffer_Write_Len(t *testing.T) {
buf := newErrBuffer()
+ defer buf.Close()
buf.Write([]byte("hello"))
assert.Equal(t, 5, buf.Len())
@@ -15,44 +16,47 @@ func TestErrBuffer_Write_Len(t *testing.T) {
func TestErrBuffer_Write_Event(t *testing.T) {
buf := newErrBuffer()
+ defer buf.Close()
tr := make(chan interface{})
buf.Listen(func(event int, ctx interface{}) {
assert.Equal(t, EventStderrOutput, event)
- assert.Equal(t, []byte("hello\n\n"), ctx)
+ assert.Equal(t, []byte("hello\n"), ctx)
close(tr)
})
- buf.Write([]byte("hello\n\n"))
+ buf.Write([]byte("hello\n"))
<-tr
// messages are read
- assert.Equal(t, 0, buf.Len())
- assert.Equal(t, "", buf.String())
+ assert.Equal(t, 6, buf.Len())
+ assert.Equal(t, "hello\n", buf.String())
}
func TestErrBuffer_Write_Event_Separated(t *testing.T) {
buf := newErrBuffer()
+ defer buf.Close()
tr := make(chan interface{})
buf.Listen(func(event int, ctx interface{}) {
assert.Equal(t, EventStderrOutput, event)
- assert.Equal(t, []byte("hello\n\n"), ctx)
+ assert.Equal(t, []byte("hello\nending"), ctx)
close(tr)
})
buf.Write([]byte("hel"))
- buf.Write([]byte("lo\n\n"))
+ buf.Write([]byte("lo\n"))
buf.Write([]byte("ending"))
<-tr
- assert.Equal(t, 6, buf.Len())
- assert.Equal(t, "ending", buf.String())
+ assert.Equal(t, 12, buf.Len())
+ assert.Equal(t, "hello\nending", buf.String())
}
func TestErrBuffer_Write_Remaining(t *testing.T) {
buf := newErrBuffer()
+ defer buf.Close()
buf.Write([]byte("hel"))
diff --git a/php-src/tests/http/echoerr.php b/php-src/tests/http/echoerr.php
new file mode 100644
index 00000000..da2ff4d8
--- /dev/null
+++ b/php-src/tests/http/echoerr.php
@@ -0,0 +1,12 @@
+<?php
+
+use \Psr\Http\Message\ServerRequestInterface;
+use \Psr\Http\Message\ResponseInterface;
+
+function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface
+{
+ error_log(strtoupper($req->getQueryParams()['hello']));
+
+ $resp->getBody()->write(strtoupper($req->getQueryParams()['hello']));
+ return $resp->withStatus(201);
+} \ No newline at end of file
diff --git a/service/http/service_test.go b/service/http/service_test.go
index 55fa660b..02d1c3f0 100644
--- a/service/http/service_test.go
+++ b/service/http/service_test.go
@@ -163,6 +163,66 @@ func Test_Service_Echo(t *testing.T) {
assert.Equal(t, "WORLD", string(b))
}
+func Test_Service_ErrorEcho(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{httpCfg: `{
+ "enable": true,
+ "address": ":6029",
+ "maxRequest": 1024,
+ "uploads": {
+ "dir": ` + tmpDir() + `,
+ "forbid": []
+ },
+ "workers":{
+ "command": "php ../../php-src/tests/http/client.php echoerr pipes",
+ "relay": "pipes",
+ "pool": {
+ "numWorkers": 1,
+ "allocateTimeout": 10000000,
+ "destroyTimeout": 10000000
+ }
+ }
+ }`}))
+
+ s, st := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusConfigured, st)
+
+ goterr := make(chan interface{})
+ s.(*Service).AddListener(func(event int, ctx interface{}) {
+ if event == roadrunner.EventStderrOutput {
+ if string(ctx.([]byte)) == "WORLD\n" {
+ goterr <- nil
+ }
+ }
+ })
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ req, err := http.NewRequest("GET", "http://localhost:6029?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+
+ <-goterr
+
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+}
+
func Test_Service_Middleware(t *testing.T) {
logger, _ := test.NewNullLogger()
logger.SetLevel(logrus.DebugLevel)
diff --git a/static_pool.go b/static_pool.go
index 48ff4853..b974ac90 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -86,7 +86,7 @@ func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
p.muw.Lock()
for _, w := range p.workers {
- w.err.Listen(l)
+ w.err.Listen(p.lsn)
}
p.muw.Unlock()
}
diff --git a/worker.go b/worker.go
index d1612edd..c52960b2 100644
--- a/worker.go
+++ b/worker.go
@@ -211,6 +211,8 @@ func (w *Worker) start() error {
if w.rl != nil {
w.rl.Close()
}
+
+ w.err.Close()
}
}()