diff options
author | Wolfy-J <[email protected]> | 2018-06-23 13:15:33 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-23 13:15:33 +0300 |
commit | 38d694411abbdb0c31b08b96452fa0604a93418a (patch) | |
tree | 828492153bd4893347534e6ee9858bbb4d45f233 | |
parent | 14a54572d7a3754aeb81d3dc9949276b7fff04fe (diff) |
support for realtime error aggegration
-rwxr-xr-x | build.sh | 7 | ||||
-rw-r--r-- | error_buffer.go | 16 | ||||
-rw-r--r-- | error_buffer_test.go | 17 | ||||
-rw-r--r-- | static_pool.go | 12 |
4 files changed, 47 insertions, 5 deletions
@@ -1,11 +1,8 @@ #!/bin/bash -set -e - cd $(dirname "${BASH_SOURCE[0]}") OD="$(pwd)" - # Pushes application version into the build information. -RR_VERSION=1.0.1 +RR_VERSION=1.0.2 # Hardcode some values to the core package LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}" @@ -47,4 +44,4 @@ if [ "$1" == "all" ]; then exit fi -CGO_ENABLED=0 go build -ldflags "$LDFLAGS -extldflags '-static'" -o "$OD/rr" cmd/rr/main.go
\ No newline at end of file +CGO_ENABLED=0 go build -ldflags "$LDFLAGS -extldflags '-static'" -o "$OD/rr" cmd/rr/main.go diff --git a/error_buffer.go b/error_buffer.go index fcf566c8..27f35e78 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -5,10 +5,22 @@ import ( "sync" ) +// EventStderrOutput - is triggered when worker sends data into stderr. The context is output data in []bytes form. +const EventStderrOutput = 1900 + // thread safe errBuffer type errBuffer struct { mu sync.Mutex buffer *bytes.Buffer + lsn func(event int, ctx interface{}) +} + +// Listen attaches error stream even listener. +func (b *errBuffer) Listen(l func(event int, ctx interface{})) { + b.mu.Lock() + defer b.mu.Unlock() + + b.lsn = l } // Len returns the number of bytes of the unread portion of the errBuffer; @@ -27,6 +39,10 @@ func (b *errBuffer) Write(p []byte) (n int, err error) { b.mu.Lock() defer b.mu.Unlock() + if b.lsn != nil { + b.lsn(EventStderrOutput, p) + } + return b.buffer.Write(p) } diff --git a/error_buffer_test.go b/error_buffer_test.go index afbc80e2..aa6a17f2 100644 --- a/error_buffer_test.go +++ b/error_buffer_test.go @@ -12,3 +12,20 @@ func TestErrBuffer_Write_Len(t *testing.T) { assert.Equal(t, 5, buf.Len()) assert.Equal(t, buf.String(), "hello") } + +func TestErrBuffer_Write_Event(t *testing.T) { + buf := &errBuffer{buffer: new(bytes.Buffer)} + + tr := make(chan interface{}) + buf.Listen(func(event int, ctx interface{}) { + assert.Equal(t, EventStderrOutput, event) + assert.Equal(t, []byte("hello"), ctx) + close(tr) + }) + + buf.Write([]byte("hello")) + + <-tr + assert.Equal(t, 5, buf.Len()) + assert.Equal(t, buf.String(), "hello") +} diff --git a/static_pool.go b/static_pool.go index b3e4f488..79166197 100644 --- a/static_pool.go +++ b/static_pool.go @@ -83,6 +83,12 @@ func (p *StaticPool) Listen(l func(event int, ctx interface{})) { defer p.mul.Unlock() p.lsn = l + + p.muw.RLock() + for _, w := range p.workers { + w.err.Listen(l) + } + p.muw.RUnlock() } // Config returns associated pool configuration. Immutable. @@ -207,6 +213,12 @@ func (p *StaticPool) createWorker() (*Worker, error) { return nil, err } + p.mul.Lock() + if p.lsn != nil { + w.err.Listen(p.lsn) + } + defer p.mul.Unlock() + p.throw(EventWorkerConstruct, w) p.muw.Lock() |