diff options
-rwxr-xr-x | sync_worker_test.go | 1 | ||||
-rwxr-xr-x | worker.go | 20 |
2 files changed, 14 insertions, 7 deletions
diff --git a/sync_worker_test.go b/sync_worker_test.go index add0a066..ef552754 100755 --- a/sync_worker_test.go +++ b/sync_worker_test.go @@ -167,6 +167,7 @@ func Test_Broken(t *testing.T) { assert.Contains(t, string(event.(WorkerEvent).Payload.([]byte)), "undefined_function()") wg.Done() }) + syncWorker, err := NewSyncWorker(w) if err != nil { t.Fatal(err) @@ -310,11 +310,13 @@ func (w *WorkerProcess) logCallback(log []byte) { // thread safe errBuffer type errBuffer struct { - enable bool - mu sync.RWMutex - buf []byte - last int - wait *time.Timer + enable bool + mu sync.RWMutex + buf []byte + last int + wait *time.Timer + // todo: remove update + update chan interface{} stop chan interface{} logCallback func(log []byte) } @@ -322,6 +324,7 @@ type errBuffer struct { func newErrBuffer(logCallback func(log []byte)) *errBuffer { eb := &errBuffer{ buf: make([]byte, 0), + update: make(chan interface{}), wait: time.NewTimer(WaitDuration), stop: make(chan interface{}), logCallback: logCallback, @@ -330,6 +333,8 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer { go func(eb *errBuffer) { for { select { + case <-eb.update: + eb.wait.Reset(WaitDuration) case <-eb.wait.C: eb.mu.Lock() if eb.enable && len(eb.buf) > eb.last { @@ -369,9 +374,10 @@ func (eb *errBuffer) Len() int { // needed. The return value n is the length of pool; errBuffer is always nil. func (eb *errBuffer) Write(p []byte) (int, error) { eb.mu.Lock() - defer eb.mu.Unlock() - eb.buf = append(eb.buf, p...) + eb.mu.Unlock() + eb.update <- nil + return len(p), nil } |