summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-23 13:15:33 +0300
committerWolfy-J <[email protected]>2018-06-23 13:15:33 +0300
commit38d694411abbdb0c31b08b96452fa0604a93418a (patch)
tree828492153bd4893347534e6ee9858bbb4d45f233
parent14a54572d7a3754aeb81d3dc9949276b7fff04fe (diff)
support for realtime error aggegration
-rwxr-xr-xbuild.sh7
-rw-r--r--error_buffer.go16
-rw-r--r--error_buffer_test.go17
-rw-r--r--static_pool.go12
4 files changed, 47 insertions, 5 deletions
diff --git a/build.sh b/build.sh
index fd84fc79..ca5d4987 100755
--- a/build.sh
+++ b/build.sh
@@ -1,11 +1,8 @@
#!/bin/bash
-set -e
-
cd $(dirname "${BASH_SOURCE[0]}")
OD="$(pwd)"
-
# Pushes application version into the build information.
-RR_VERSION=1.0.1
+RR_VERSION=1.0.2
# Hardcode some values to the core package
LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}"
@@ -47,4 +44,4 @@ if [ "$1" == "all" ]; then
exit
fi
-CGO_ENABLED=0 go build -ldflags "$LDFLAGS -extldflags '-static'" -o "$OD/rr" cmd/rr/main.go \ No newline at end of file
+CGO_ENABLED=0 go build -ldflags "$LDFLAGS -extldflags '-static'" -o "$OD/rr" cmd/rr/main.go
diff --git a/error_buffer.go b/error_buffer.go
index fcf566c8..27f35e78 100644
--- a/error_buffer.go
+++ b/error_buffer.go
@@ -5,10 +5,22 @@ import (
"sync"
)
+// EventStderrOutput - is triggered when worker sends data into stderr. The context is output data in []bytes form.
+const EventStderrOutput = 1900
+
// thread safe errBuffer
type errBuffer struct {
mu sync.Mutex
buffer *bytes.Buffer
+ lsn func(event int, ctx interface{})
+}
+
+// Listen attaches error stream even listener.
+func (b *errBuffer) Listen(l func(event int, ctx interface{})) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ b.lsn = l
}
// Len returns the number of bytes of the unread portion of the errBuffer;
@@ -27,6 +39,10 @@ func (b *errBuffer) Write(p []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()
+ if b.lsn != nil {
+ b.lsn(EventStderrOutput, p)
+ }
+
return b.buffer.Write(p)
}
diff --git a/error_buffer_test.go b/error_buffer_test.go
index afbc80e2..aa6a17f2 100644
--- a/error_buffer_test.go
+++ b/error_buffer_test.go
@@ -12,3 +12,20 @@ func TestErrBuffer_Write_Len(t *testing.T) {
assert.Equal(t, 5, buf.Len())
assert.Equal(t, buf.String(), "hello")
}
+
+func TestErrBuffer_Write_Event(t *testing.T) {
+ buf := &errBuffer{buffer: new(bytes.Buffer)}
+
+ tr := make(chan interface{})
+ buf.Listen(func(event int, ctx interface{}) {
+ assert.Equal(t, EventStderrOutput, event)
+ assert.Equal(t, []byte("hello"), ctx)
+ close(tr)
+ })
+
+ buf.Write([]byte("hello"))
+
+ <-tr
+ assert.Equal(t, 5, buf.Len())
+ assert.Equal(t, buf.String(), "hello")
+}
diff --git a/static_pool.go b/static_pool.go
index b3e4f488..79166197 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -83,6 +83,12 @@ func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
defer p.mul.Unlock()
p.lsn = l
+
+ p.muw.RLock()
+ for _, w := range p.workers {
+ w.err.Listen(l)
+ }
+ p.muw.RUnlock()
}
// Config returns associated pool configuration. Immutable.
@@ -207,6 +213,12 @@ func (p *StaticPool) createWorker() (*Worker, error) {
return nil, err
}
+ p.mul.Lock()
+ if p.lsn != nil {
+ w.err.Listen(p.lsn)
+ }
+ defer p.mul.Unlock()
+
p.throw(EventWorkerConstruct, w)
p.muw.Lock()