summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker.go')
-rwxr-xr-xworker.go65
1 files changed, 35 insertions, 30 deletions
diff --git a/worker.go b/worker.go
index f17f2c07..e639c122 100755
--- a/worker.go
+++ b/worker.go
@@ -58,6 +58,13 @@ type WorkerEvent struct {
Payload interface{}
}
+var pool = sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, 10240)
+ return buf
+ },
+}
+
type WorkerBase interface {
fmt.Stringer
@@ -243,6 +250,8 @@ func (w *WorkerProcess) Wait() error {
// if process return code > 0, here will be an error from stderr (if presents)
if w.stderr.Len() > 0 {
err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
+ // stop the stderr buffer
+ w.stop <- struct{}{}
}
w.mu.RUnlock()
@@ -311,47 +320,43 @@ func (w *WorkerProcess) Kill() error {
return nil
}
+func (w *WorkerProcess) put(data []byte) {
+ data = make([]byte, 10240)
+ pool.Put(data)
+}
+
+func (w *WorkerProcess) get() []byte {
+ return pool.Get().([]byte)
+}
+
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
// needed. The return value n is the length of pool; errBuffer is always nil.
func (w *WorkerProcess) watch() {
- proxy := make(chan [10240]byte, 5)
-
go func() {
for {
select {
case <-w.stop:
+ buf := w.get()
// read the last data
- var buf [10240]byte
- _, err := w.rd.Read(buf[:])
- if err != nil {
- panic(err)
- }
- proxy <- buf
- // and close
- close(proxy)
+ n, _ := w.rd.Read(buf[:])
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: buf[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write(buf[:n])
+ w.mu.Unlock()
+ w.put(buf)
return
default:
- var buf [10240]byte
- _, err := w.rd.Read(buf[:])
- if err != nil {
- panic(err)
- }
- proxy <- buf
+ // read the max 10kb of stderr per one read
+ buf := w.get()
+ n, _ := w.rd.Read(buf[:])
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: buf[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write(buf[:n])
+ w.mu.Unlock()
+ w.put(buf)
}
}
}()
-
- for {
- select {
- case payload, ok := <-proxy:
- if !ok {
- return
- }
- w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: payload[:]})
- w.mu.Lock()
- // write new message
- w.stderr.Write(payload[:])
- w.mu.Unlock()
- }
- }
}