summaryrefslogtreecommitdiff
path: root/pkg/worker/worker.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-24 11:05:00 +0300
committerGitHub <[email protected]>2020-12-24 11:05:00 +0300
commit439c93225d7a9ebaf7cbf1010a54594b906f7d54 (patch)
treeadc7d78e28a9512cb1b1ba85cb4f19dec7d1323d /pkg/worker/worker.go
parent40b6c3169931a3fef62b649db19ff01dc685b7d4 (diff)
parent62ee1770cc233328300438ffd690ea1d8fc747bb (diff)
Merge pull request #462 from spiral/feature/CLIv2.0.0-alpha29
feature/CLI
Diffstat (limited to 'pkg/worker/worker.go')
-rwxr-xr-xpkg/worker/worker.go31
1 files changed, 29 insertions, 2 deletions
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 456f4bea..db182a3e 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -29,6 +29,8 @@ const (
ReadBufSize = 10240 // Kb
)
+type Options func(p *Process)
+
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
@@ -76,7 +78,7 @@ type Process struct {
}
// InitBaseWorker creates new Process over given exec.cmd.
-func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
+func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, error) {
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
@@ -103,6 +105,11 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
// at this point we know, that stderr will contain huge messages
w.stderr.Grow(ReadBufSize)
+ // add options
+ for i := 0; i < len(options); i++ {
+ options[i](w)
+ }
+
go func() {
w.watch()
}()
@@ -110,6 +117,14 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
return w, nil
}
+func AddListeners(listeners ...events.EventListener) Options {
+ return func(p *Process) {
+ for i := 0; i < len(listeners); i++ {
+ p.addListener(listeners[i])
+ }
+ }
+}
+
// Pid returns worker pid.
func (w *Process) Pid() int64 {
return int64(w.pid)
@@ -121,7 +136,7 @@ func (w *Process) Created() time.Time {
}
// AddListener registers new worker event listener.
-func (w *Process) AddListener(listener events.EventListener) {
+func (w *Process) addListener(listener events.EventListener) {
w.events.AddListener(listener)
}
@@ -176,6 +191,10 @@ func (w *Process) Wait() error {
const op = errors.Op("worker process wait")
err := multierr.Combine(w.cmd.Wait())
+ if w.State().Value() == internal.StateDestroyed {
+ return errors.E(op, err)
+ }
+
// at this point according to the documentation (see cmd.Wait comment)
// if worker finishes with an error, message will be written to the stderr first
// and then w.cmd.Wait return an error
@@ -234,6 +253,14 @@ func (w *Process) Stop() error {
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
func (w *Process) Kill() error {
+ if w.State().Value() == internal.StateDestroyed {
+ err := w.cmd.Process.Signal(os.Kill)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+
w.state.Set(internal.StateKilling)
err := w.cmd.Process.Signal(os.Kill)
if err != nil {