summaryrefslogtreecommitdiff
path: root/pkg/worker/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker/worker.go')
-rwxr-xr-xpkg/worker/worker.go31
1 files changed, 29 insertions, 2 deletions
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 456f4bea..db182a3e 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -29,6 +29,8 @@ const (
ReadBufSize = 10240 // Kb
)
+type Options func(p *Process)
+
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
@@ -76,7 +78,7 @@ type Process struct {
}
// InitBaseWorker creates new Process over given exec.cmd.
-func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
+func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, error) {
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
@@ -103,6 +105,11 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
// at this point we know, that stderr will contain huge messages
w.stderr.Grow(ReadBufSize)
+ // add options
+ for i := 0; i < len(options); i++ {
+ options[i](w)
+ }
+
go func() {
w.watch()
}()
@@ -110,6 +117,14 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
return w, nil
}
+func AddListeners(listeners ...events.EventListener) Options {
+ return func(p *Process) {
+ for i := 0; i < len(listeners); i++ {
+ p.addListener(listeners[i])
+ }
+ }
+}
+
// Pid returns worker pid.
func (w *Process) Pid() int64 {
return int64(w.pid)
@@ -121,7 +136,7 @@ func (w *Process) Created() time.Time {
}
// AddListener registers new worker event listener.
-func (w *Process) AddListener(listener events.EventListener) {
+func (w *Process) addListener(listener events.EventListener) {
w.events.AddListener(listener)
}
@@ -176,6 +191,10 @@ func (w *Process) Wait() error {
const op = errors.Op("worker process wait")
err := multierr.Combine(w.cmd.Wait())
+ if w.State().Value() == internal.StateDestroyed {
+ return errors.E(op, err)
+ }
+
// at this point according to the documentation (see cmd.Wait comment)
// if worker finishes with an error, message will be written to the stderr first
// and then w.cmd.Wait return an error
@@ -234,6 +253,14 @@ func (w *Process) Stop() error {
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
func (w *Process) Kill() error {
+ if w.State().Value() == internal.StateDestroyed {
+ err := w.cmd.Process.Signal(os.Kill)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+
w.state.Set(internal.StateKilling)
err := w.cmd.Process.Signal(os.Kill)
if err != nil {