diff options
Diffstat (limited to 'error_buffer.go')
-rw-r--r-- | error_buffer.go | 113 |
1 files changed, 0 insertions, 113 deletions
diff --git a/error_buffer.go b/error_buffer.go deleted file mode 100644 index 0fc020c7..00000000 --- a/error_buffer.go +++ /dev/null @@ -1,113 +0,0 @@ -package roadrunner - -import ( - "sync" - "time" -) - -const ( - // EventStderrOutput - is triggered when worker sends data into stderr. The context - // is error message ([]byte). - EventStderrOutput = 1900 - - // WaitDuration - for how long error buffer should attempt to aggregate error messages - // before merging output together since lastError update (required to keep error update together). - WaitDuration = 100 * time.Millisecond -) - -// thread safe errBuffer -type errBuffer struct { - mu sync.Mutex - buf []byte - last int - wait *time.Timer - update chan interface{} - stop chan interface{} - lsn func(event int, ctx interface{}) -} - -func newErrBuffer() *errBuffer { - eb := &errBuffer{ - buf: make([]byte, 0), - update: make(chan interface{}), - wait: time.NewTimer(WaitDuration), - stop: make(chan interface{}), - } - - go func() { - for { - select { - case <-eb.update: - eb.wait.Reset(WaitDuration) - case <-eb.wait.C: - eb.mu.Lock() - if len(eb.buf) > eb.last { - if eb.lsn != nil { - eb.lsn(EventStderrOutput, eb.buf[eb.last:]) - eb.buf = eb.buf[0:0] - } - - eb.last = len(eb.buf) - } - eb.mu.Unlock() - case <-eb.stop: - eb.wait.Stop() - - eb.mu.Lock() - if len(eb.buf) > eb.last { - if eb.lsn != nil { - eb.lsn(EventStderrOutput, eb.buf[eb.last:]) - } - - eb.last = len(eb.buf) - } - eb.mu.Unlock() - return - } - } - }() - - return eb -} - -// Listen attaches error stream even listener. -func (eb *errBuffer) Listen(l func(event int, ctx interface{})) { - eb.mu.Lock() - eb.lsn = l - eb.mu.Unlock() -} - -// Len returns the number of buf of the unread portion of the errBuffer; -// buf.Len() == len(buf.Bytes()). -func (eb *errBuffer) Len() int { - eb.mu.Lock() - defer eb.mu.Unlock() - - // currently active message - return len(eb.buf) -} - -// Write appends the contents of pool to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of pool; err is always nil. -func (eb *errBuffer) Write(p []byte) (int, error) { - eb.mu.Lock() - eb.buf = append(eb.buf, p...) - eb.mu.Unlock() - eb.update <- nil - - return len(p), nil -} - -// Strings fetches all errBuffer data into string. -func (eb *errBuffer) String() string { - eb.mu.Lock() - defer eb.mu.Unlock() - - return string(eb.buf) -} - -// Close aggregation timer. -func (eb *errBuffer) Close() error { - close(eb.stop) - return nil -} |