diff options
author | Wolfy-J <[email protected]> | 2018-06-23 20:45:39 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2018-06-23 20:45:39 +0300 |
commit | 2ecba8ab4109108170433862f6d4a29abaac65d8 (patch) | |
tree | 36cfe76e67998f414d7efb00f1a7c030bbfbcfad /error_buffer.go | |
parent | 14a54572d7a3754aeb81d3dc9949276b7fff04fe (diff) | |
parent | 18201f5f6af71fad14bbfc93eec1654b2f8fa585 (diff) |
Merge pull request #22 from spiral/feature/stderr
Feature/stderr
Diffstat (limited to 'error_buffer.go')
-rw-r--r-- | error_buffer.go | 108 |
1 files changed, 90 insertions, 18 deletions
diff --git a/error_buffer.go b/error_buffer.go index fcf566c8..8be9c5a8 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -1,39 +1,111 @@ package roadrunner import ( - "bytes" "sync" + "time" +) + +const ( + // EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte). + EventStderrOutput = 1900 + + // WaitDuration - for how long error buffer should attempt to aggregate error messages before merging output + // together since lastError update (required to keep error update together). + WaitDuration = 100 * time.Millisecond ) // thread safe errBuffer type errBuffer struct { mu sync.Mutex - buffer *bytes.Buffer + buf []byte + last int + wait *time.Timer + update chan interface{} + stop chan interface{} + lsn func(event int, ctx interface{}) +} + +func newErrBuffer() *errBuffer { + eb := &errBuffer{ + buf: make([]byte, 0), + update: make(chan interface{}), + wait: time.NewTimer(WaitDuration), + stop: make(chan interface{}), + } + + go func() { + for { + select { + case <-eb.update: + eb.wait.Reset(WaitDuration) + case <-eb.wait.C: + eb.mu.Lock() + if len(eb.buf) > eb.last { + if eb.lsn != nil { + eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + } + eb.last = len(eb.buf) + } + eb.mu.Unlock() + case <-eb.stop: + eb.wait.Stop() + + eb.mu.Lock() + if len(eb.buf) > eb.last { + if eb.lsn != nil { + eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + } + eb.last = len(eb.buf) + } + eb.mu.Unlock() + return + } + } + }() + + return eb +} + +// Listen attaches error stream even listener. +func (eb *errBuffer) Listen(l func(event int, ctx interface{})) { + eb.mu.Lock() + defer eb.mu.Unlock() + + eb.lsn = l } -// Len returns the number of bytes of the unread portion of the errBuffer; -// b.Len() == len(b.Bytes()). -func (b *errBuffer) Len() int { - b.mu.Lock() - defer b.mu.Unlock() +// Len returns the number of buf of the unread portion of the errBuffer; +// buf.Len() == len(buf.Bytes()). +func (eb *errBuffer) Len() int { + eb.mu.Lock() + defer eb.mu.Unlock() - return b.buffer.Len() + // currently active message + return len(eb.buf) } // Write appends the contents of p to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of p; err is always nil. If the -// errBuffer becomes too large, Write will panic with ErrTooLarge. -func (b *errBuffer) Write(p []byte) (n int, err error) { - b.mu.Lock() - defer b.mu.Unlock() +// needed. The return value n is the length of p; err is always nil. +func (eb *errBuffer) Write(p []byte) (int, error) { + eb.mu.Lock() + defer eb.mu.Unlock() - return b.buffer.Write(p) + eb.buf = append(eb.buf, p...) + eb.update <- nil + + return len(p), nil } // Strings fetches all errBuffer data into string. -func (b *errBuffer) String() string { - b.mu.Lock() - defer b.mu.Unlock() +func (eb *errBuffer) String() string { + eb.mu.Lock() + defer eb.mu.Unlock() + + return string(eb.buf) +} - return b.buffer.String() +// Close aggregation timer. +func (eb *errBuffer) Close() error { + close(eb.stop) + return nil } |