diff options
Diffstat (limited to 'worker.go')
-rwxr-xr-x | worker.go | 36 |
1 files changed, 21 insertions, 15 deletions
@@ -26,16 +26,28 @@ const ( // EventWorkerKill thrown after WorkerProcess is being forcefully killed. const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError int64 = iota + 200 + EventWorkerError Event = iota + 200 // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog ) +type Event int64 + +func (ev Event) String() string { + switch ev { + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + } + return "Unknown event type" +} + // WorkerEvent wraps worker events. type WorkerEvent struct { // Event id, see below. - Event int64 + Event Event // Worker triggered the event. Worker WorkerBase @@ -298,13 +310,11 @@ func (w *WorkerProcess) logCallback(log []byte) { // thread safe errBuffer type errBuffer struct { - enable bool - mu sync.RWMutex - buf []byte - last int - wait *time.Timer - // todo: remove update - update chan interface{} + enable bool + mu sync.RWMutex + buf []byte + last int + wait *time.Timer stop chan interface{} logCallback func(log []byte) } @@ -312,7 +322,6 @@ type errBuffer struct { func newErrBuffer(logCallback func(log []byte)) *errBuffer { eb := &errBuffer{ buf: make([]byte, 0), - update: make(chan interface{}), wait: time.NewTimer(WaitDuration), stop: make(chan interface{}), logCallback: logCallback, @@ -321,8 +330,6 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer { go func(eb *errBuffer) { for { select { - case <-eb.update: - eb.wait.Reset(WaitDuration) case <-eb.wait.C: eb.mu.Lock() if eb.enable && len(eb.buf) > eb.last { @@ -362,10 +369,9 @@ func (eb *errBuffer) Len() int { // needed. The return value n is the length of pool; errBuffer is always nil. func (eb *errBuffer) Write(p []byte) (int, error) { eb.mu.Lock() - eb.buf = append(eb.buf, p...) - eb.mu.Unlock() - eb.update <- nil + defer eb.mu.Unlock() + eb.buf = append(eb.buf, p...) return len(p), nil } |