diff options
Diffstat (limited to 'pkg/worker')
-rwxr-xr-x | pkg/worker/sync_worker.go | 9 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 58 |
2 files changed, 27 insertions, 40 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index eacb8a8a..11992f22 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -63,9 +63,10 @@ type wexec struct { } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) { - const op = errors.Op("ExecWithContext") +func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { + const op = errors.Op("ExecWithTimeout") c := make(chan wexec, 1) + go func() { if len(p.Body) == 0 && len(p.Context) == 0 { c <- wexec{ @@ -211,8 +212,8 @@ func (tw *syncWorker) Wait() error { return tw.w.Wait() } -func (tw *syncWorker) Stop(ctx context.Context) error { - return tw.w.Stop(ctx) +func (tw *syncWorker) Stop() error { + return tw.w.Stop() } func (tw *syncWorker) Kill() error { diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index ae59d611..456f4bea 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -2,7 +2,6 @@ package worker import ( "bytes" - "context" "fmt" "io" "os" @@ -30,13 +29,6 @@ const ( ReadBufSize = 10240 // Kb ) -var syncPool = sync.Pool{ - New: func() interface{} { - buf := make([]byte, ReadBufSize) - return &buf - }, -} - // Process - supervised process with api over goridge.Relay. type Process struct { // created indicates at what time Process has been created. @@ -79,6 +71,8 @@ type Process struct { rd io.Reader // stop signal terminates io.Pipe from reading from stderr stop chan struct{} + + syncPool sync.Pool } // InitBaseWorker creates new Process over given exec.cmd. @@ -93,6 +87,14 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { state: internal.NewWorkerState(internal.StateInactive), stderr: new(bytes.Buffer), stop: make(chan struct{}, 1), + // sync pool for STDERR + // All receivers are pointers + syncPool: sync.Pool{ + New: func() interface{} { + buf := make([]byte, ReadBufSize) + return &buf + }, + }, } w.rd, w.cmd.Stderr = io.Pipe() @@ -217,30 +219,16 @@ func (w *Process) closeRelay() error { } // Stop sends soft termination command to the Process and waits for process completion. -func (w *Process) Stop(ctx context.Context) error { - c := make(chan error) - - go func() { - var err error - w.state.Set(internal.StateStopping) - err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) - if err != nil { - w.state.Set(internal.StateKilling) - c <- multierr.Append(err, w.cmd.Process.Kill()) - } - w.state.Set(internal.StateStopped) - c <- nil - }() - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-c: - if err != nil { - return err - } - return nil +func (w *Process) Stop() error { + var err error + w.state.Set(internal.StateStopping) + err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) + if err != nil { + w.state.Set(internal.StateKilling) + return multierr.Append(err, w.cmd.Process.Kill()) } + w.state.Set(internal.StateStopped) + return nil } // Kill kills underlying process, make sure to call Wait() func to gather @@ -258,15 +246,12 @@ func (w *Process) Kill() error { // put the pointer, to not allocate new slice // but erase it len and then return back func (w *Process) put(data *[]byte) { - *data = (*data)[:0] - *data = (*data)[:cap(*data)] - - syncPool.Put(data) + w.syncPool.Put(data) } // get pointer to the byte slice func (w *Process) get() *[]byte { - return syncPool.Get().(*[]byte) + return w.syncPool.Get().(*[]byte) } // Write appends the contents of pool to the errBuffer, growing the errBuffer as @@ -282,6 +267,7 @@ func (w *Process) watch() { w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) w.mu.Lock() // write new message + // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool w.stderr.Write((*buf)[:n]) w.mu.Unlock() w.put(buf) |