summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xsync_worker_test.go1
-rwxr-xr-xworker.go20
2 files changed, 14 insertions, 7 deletions
diff --git a/sync_worker_test.go b/sync_worker_test.go
index add0a066..ef552754 100755
--- a/sync_worker_test.go
+++ b/sync_worker_test.go
@@ -167,6 +167,7 @@ func Test_Broken(t *testing.T) {
assert.Contains(t, string(event.(WorkerEvent).Payload.([]byte)), "undefined_function()")
wg.Done()
})
+
syncWorker, err := NewSyncWorker(w)
if err != nil {
t.Fatal(err)
diff --git a/worker.go b/worker.go
index 0f9b134d..f815ddea 100755
--- a/worker.go
+++ b/worker.go
@@ -310,11 +310,13 @@ func (w *WorkerProcess) logCallback(log []byte) {
// thread safe errBuffer
type errBuffer struct {
- enable bool
- mu sync.RWMutex
- buf []byte
- last int
- wait *time.Timer
+ enable bool
+ mu sync.RWMutex
+ buf []byte
+ last int
+ wait *time.Timer
+ // todo: remove update
+ update chan interface{}
stop chan interface{}
logCallback func(log []byte)
}
@@ -322,6 +324,7 @@ type errBuffer struct {
func newErrBuffer(logCallback func(log []byte)) *errBuffer {
eb := &errBuffer{
buf: make([]byte, 0),
+ update: make(chan interface{}),
wait: time.NewTimer(WaitDuration),
stop: make(chan interface{}),
logCallback: logCallback,
@@ -330,6 +333,8 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer {
go func(eb *errBuffer) {
for {
select {
+ case <-eb.update:
+ eb.wait.Reset(WaitDuration)
case <-eb.wait.C:
eb.mu.Lock()
if eb.enable && len(eb.buf) > eb.last {
@@ -369,9 +374,10 @@ func (eb *errBuffer) Len() int {
// needed. The return value n is the length of pool; errBuffer is always nil.
func (eb *errBuffer) Write(p []byte) (int, error) {
eb.mu.Lock()
- defer eb.mu.Unlock()
-
eb.buf = append(eb.buf, p...)
+ eb.mu.Unlock()
+ eb.update <- nil
+
return len(p), nil
}