diff options
-rw-r--r-- | cmd/rr/cmd/version.go | 2 | ||||
-rw-r--r-- | error_buffer.go | 82 | ||||
-rw-r--r-- | error_buffer_test.go | 20 | ||||
-rw-r--r-- | php-src/tests/http/echoerr.php | 12 | ||||
-rw-r--r-- | service/http/service_test.go | 60 | ||||
-rw-r--r-- | static_pool.go | 2 | ||||
-rw-r--r-- | worker.go | 2 |
7 files changed, 148 insertions, 32 deletions
diff --git a/cmd/rr/cmd/version.go b/cmd/rr/cmd/version.go index 45631fbb..fb7a2f88 100644 --- a/cmd/rr/cmd/version.go +++ b/cmd/rr/cmd/version.go @@ -2,7 +2,7 @@ package cmd var ( // Version - defines build version. - Version = "1.0.0" + Version = "1.0.0" // BuildTime - defined build time. BuildTime = "development" ) diff --git a/error_buffer.go b/error_buffer.go index 78effc9b..8be9c5a8 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -1,24 +1,69 @@ package roadrunner import ( - "bytes" "sync" + "time" ) -// EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte). -const EventStderrOutput = 1900 +const ( + // EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte). + EventStderrOutput = 1900 + + // WaitDuration - for how long error buffer should attempt to aggregate error messages before merging output + // together since lastError update (required to keep error update together). + WaitDuration = 100 * time.Millisecond +) // thread safe errBuffer type errBuffer struct { - mu sync.Mutex - buf []byte - off int - lsn func(event int, ctx interface{}) + mu sync.Mutex + buf []byte + last int + wait *time.Timer + update chan interface{} + stop chan interface{} + lsn func(event int, ctx interface{}) } func newErrBuffer() *errBuffer { - buf := &errBuffer{buf: make([]byte, 0)} - return buf + eb := &errBuffer{ + buf: make([]byte, 0), + update: make(chan interface{}), + wait: time.NewTimer(WaitDuration), + stop: make(chan interface{}), + } + + go func() { + for { + select { + case <-eb.update: + eb.wait.Reset(WaitDuration) + case <-eb.wait.C: + eb.mu.Lock() + if len(eb.buf) > eb.last { + if eb.lsn != nil { + eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + } + eb.last = len(eb.buf) + } + eb.mu.Unlock() + case <-eb.stop: + eb.wait.Stop() + + eb.mu.Lock() + if len(eb.buf) > eb.last { + if eb.lsn != nil { + eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + } + eb.last = len(eb.buf) + } + eb.mu.Unlock() + return + } + } + }() + + return eb } // Listen attaches error stream even listener. @@ -36,7 +81,7 @@ func (eb *errBuffer) Len() int { defer eb.mu.Unlock() // currently active message - return len(eb.buf) - eb.off + return len(eb.buf) } // Write appends the contents of p to the errBuffer, growing the errBuffer as @@ -46,11 +91,7 @@ func (eb *errBuffer) Write(p []byte) (int, error) { defer eb.mu.Unlock() eb.buf = append(eb.buf, p...) - for msg := eb.fetchMsg(); msg != nil; msg = eb.fetchMsg() { - if eb.lsn != nil { - eb.lsn(EventStderrOutput, msg) - } - } + eb.update <- nil return len(p), nil } @@ -60,14 +101,11 @@ func (eb *errBuffer) String() string { eb.mu.Lock() defer eb.mu.Unlock() - return string(eb.buf[eb.off:]) + return string(eb.buf) } -func (eb *errBuffer) fetchMsg() []byte { - if i := bytes.Index(eb.buf[eb.off:], []byte{10, 10}); i != -1 { - eb.off += i + 2 - return eb.buf[eb.off-i-2 : eb.off] - } - +// Close aggregation timer. +func (eb *errBuffer) Close() error { + close(eb.stop) return nil } diff --git a/error_buffer_test.go b/error_buffer_test.go index 2bf98915..09ea4f03 100644 --- a/error_buffer_test.go +++ b/error_buffer_test.go @@ -7,6 +7,7 @@ import ( func TestErrBuffer_Write_Len(t *testing.T) { buf := newErrBuffer() + defer buf.Close() buf.Write([]byte("hello")) assert.Equal(t, 5, buf.Len()) @@ -15,44 +16,47 @@ func TestErrBuffer_Write_Len(t *testing.T) { func TestErrBuffer_Write_Event(t *testing.T) { buf := newErrBuffer() + defer buf.Close() tr := make(chan interface{}) buf.Listen(func(event int, ctx interface{}) { assert.Equal(t, EventStderrOutput, event) - assert.Equal(t, []byte("hello\n\n"), ctx) + assert.Equal(t, []byte("hello\n"), ctx) close(tr) }) - buf.Write([]byte("hello\n\n")) + buf.Write([]byte("hello\n")) <-tr // messages are read - assert.Equal(t, 0, buf.Len()) - assert.Equal(t, "", buf.String()) + assert.Equal(t, 6, buf.Len()) + assert.Equal(t, "hello\n", buf.String()) } func TestErrBuffer_Write_Event_Separated(t *testing.T) { buf := newErrBuffer() + defer buf.Close() tr := make(chan interface{}) buf.Listen(func(event int, ctx interface{}) { assert.Equal(t, EventStderrOutput, event) - assert.Equal(t, []byte("hello\n\n"), ctx) + assert.Equal(t, []byte("hello\nending"), ctx) close(tr) }) buf.Write([]byte("hel")) - buf.Write([]byte("lo\n\n")) + buf.Write([]byte("lo\n")) buf.Write([]byte("ending")) <-tr - assert.Equal(t, 6, buf.Len()) - assert.Equal(t, "ending", buf.String()) + assert.Equal(t, 12, buf.Len()) + assert.Equal(t, "hello\nending", buf.String()) } func TestErrBuffer_Write_Remaining(t *testing.T) { buf := newErrBuffer() + defer buf.Close() buf.Write([]byte("hel")) diff --git a/php-src/tests/http/echoerr.php b/php-src/tests/http/echoerr.php new file mode 100644 index 00000000..da2ff4d8 --- /dev/null +++ b/php-src/tests/http/echoerr.php @@ -0,0 +1,12 @@ +<?php + +use \Psr\Http\Message\ServerRequestInterface; +use \Psr\Http\Message\ResponseInterface; + +function handleRequest(ServerRequestInterface $req, ResponseInterface $resp): ResponseInterface +{ + error_log(strtoupper($req->getQueryParams()['hello'])); + + $resp->getBody()->write(strtoupper($req->getQueryParams()['hello'])); + return $resp->withStatus(201); +}
\ No newline at end of file diff --git a/service/http/service_test.go b/service/http/service_test.go index 55fa660b..02d1c3f0 100644 --- a/service/http/service_test.go +++ b/service/http/service_test.go @@ -163,6 +163,66 @@ func Test_Service_Echo(t *testing.T) { assert.Equal(t, "WORLD", string(b)) } +func Test_Service_ErrorEcho(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + "enable": true, + "address": ":6029", + "maxRequest": 1024, + "uploads": { + "dir": ` + tmpDir() + `, + "forbid": [] + }, + "workers":{ + "command": "php ../../php-src/tests/http/client.php echoerr pipes", + "relay": "pipes", + "pool": { + "numWorkers": 1, + "allocateTimeout": 10000000, + "destroyTimeout": 10000000 + } + } + }`})) + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusConfigured, st) + + goterr := make(chan interface{}) + s.(*Service).AddListener(func(event int, ctx interface{}) { + if event == roadrunner.EventStderrOutput { + if string(ctx.([]byte)) == "WORLD\n" { + goterr <- nil + } + } + }) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + req, err := http.NewRequest("GET", "http://localhost:6029?hello=world", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + + <-goterr + + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) +} + func Test_Service_Middleware(t *testing.T) { logger, _ := test.NewNullLogger() logger.SetLevel(logrus.DebugLevel) diff --git a/static_pool.go b/static_pool.go index 48ff4853..b974ac90 100644 --- a/static_pool.go +++ b/static_pool.go @@ -86,7 +86,7 @@ func (p *StaticPool) Listen(l func(event int, ctx interface{})) { p.muw.Lock() for _, w := range p.workers { - w.err.Listen(l) + w.err.Listen(p.lsn) } p.muw.Unlock() } @@ -211,6 +211,8 @@ func (w *Worker) start() error { if w.rl != nil { w.rl.Close() } + + w.err.Close() } }() |