diff options
Diffstat (limited to 'worker.go')
-rwxr-xr-x | worker.go | 65 |
1 files changed, 35 insertions, 30 deletions
@@ -58,6 +58,13 @@ type WorkerEvent struct { Payload interface{} } +var pool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 10240) + return buf + }, +} + type WorkerBase interface { fmt.Stringer @@ -243,6 +250,8 @@ func (w *WorkerProcess) Wait() error { // if process return code > 0, here will be an error from stderr (if presents) if w.stderr.Len() > 0 { err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) + // stop the stderr buffer + w.stop <- struct{}{} } w.mu.RUnlock() @@ -311,47 +320,43 @@ func (w *WorkerProcess) Kill() error { return nil } +func (w *WorkerProcess) put(data []byte) { + data = make([]byte, 10240) + pool.Put(data) +} + +func (w *WorkerProcess) get() []byte { + return pool.Get().([]byte) +} + // Write appends the contents of pool to the errBuffer, growing the errBuffer as // needed. The return value n is the length of pool; errBuffer is always nil. func (w *WorkerProcess) watch() { - proxy := make(chan [10240]byte, 5) - go func() { for { select { case <-w.stop: + buf := w.get() // read the last data - var buf [10240]byte - _, err := w.rd.Read(buf[:]) - if err != nil { - panic(err) - } - proxy <- buf - // and close - close(proxy) + n, _ := w.rd.Read(buf[:]) + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: buf[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write(buf[:n]) + w.mu.Unlock() + w.put(buf) return default: - var buf [10240]byte - _, err := w.rd.Read(buf[:]) - if err != nil { - panic(err) - } - proxy <- buf + // read the max 10kb of stderr per one read + buf := w.get() + n, _ := w.rd.Read(buf[:]) + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: buf[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write(buf[:n]) + w.mu.Unlock() + w.put(buf) } } }() - - for { - select { - case payload, ok := <-proxy: - if !ok { - return - } - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: payload[:]}) - w.mu.Lock() - // write new message - w.stderr.Write(payload[:]) - w.mu.Unlock() - } - } } |