summaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker')
-rwxr-xr-xpkg/worker/sync_worker.go9
-rwxr-xr-xpkg/worker/worker.go58
2 files changed, 27 insertions, 40 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index eacb8a8a..11992f22 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -63,9 +63,10 @@ type wexec struct {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("ExecWithContext")
+func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("ExecWithTimeout")
c := make(chan wexec, 1)
+
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
@@ -211,8 +212,8 @@ func (tw *syncWorker) Wait() error {
return tw.w.Wait()
}
-func (tw *syncWorker) Stop(ctx context.Context) error {
- return tw.w.Stop(ctx)
+func (tw *syncWorker) Stop() error {
+ return tw.w.Stop()
}
func (tw *syncWorker) Kill() error {
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index ae59d611..456f4bea 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -2,7 +2,6 @@ package worker
import (
"bytes"
- "context"
"fmt"
"io"
"os"
@@ -30,13 +29,6 @@ const (
ReadBufSize = 10240 // Kb
)
-var syncPool = sync.Pool{
- New: func() interface{} {
- buf := make([]byte, ReadBufSize)
- return &buf
- },
-}
-
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
@@ -79,6 +71,8 @@ type Process struct {
rd io.Reader
// stop signal terminates io.Pipe from reading from stderr
stop chan struct{}
+
+ syncPool sync.Pool
}
// InitBaseWorker creates new Process over given exec.cmd.
@@ -93,6 +87,14 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
state: internal.NewWorkerState(internal.StateInactive),
stderr: new(bytes.Buffer),
stop: make(chan struct{}, 1),
+ // sync pool for STDERR
+ // All receivers are pointers
+ syncPool: sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, ReadBufSize)
+ return &buf
+ },
+ },
}
w.rd, w.cmd.Stderr = io.Pipe()
@@ -217,30 +219,16 @@ func (w *Process) closeRelay() error {
}
// Stop sends soft termination command to the Process and waits for process completion.
-func (w *Process) Stop(ctx context.Context) error {
- c := make(chan error)
-
- go func() {
- var err error
- w.state.Set(internal.StateStopping)
- err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
- if err != nil {
- w.state.Set(internal.StateKilling)
- c <- multierr.Append(err, w.cmd.Process.Kill())
- }
- w.state.Set(internal.StateStopped)
- c <- nil
- }()
-
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-c:
- if err != nil {
- return err
- }
- return nil
+func (w *Process) Stop() error {
+ var err error
+ w.state.Set(internal.StateStopping)
+ err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
+ if err != nil {
+ w.state.Set(internal.StateKilling)
+ return multierr.Append(err, w.cmd.Process.Kill())
}
+ w.state.Set(internal.StateStopped)
+ return nil
}
// Kill kills underlying process, make sure to call Wait() func to gather
@@ -258,15 +246,12 @@ func (w *Process) Kill() error {
// put the pointer, to not allocate new slice
// but erase it len and then return back
func (w *Process) put(data *[]byte) {
- *data = (*data)[:0]
- *data = (*data)[:cap(*data)]
-
- syncPool.Put(data)
+ w.syncPool.Put(data)
}
// get pointer to the byte slice
func (w *Process) get() *[]byte {
- return syncPool.Get().(*[]byte)
+ return w.syncPool.Get().(*[]byte)
}
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
@@ -282,6 +267,7 @@ func (w *Process) watch() {
w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
w.mu.Lock()
// write new message
+ // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool
w.stderr.Write((*buf)[:n])
w.mu.Unlock()
w.put(buf)