summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-14 15:23:22 +0300
committerValery Piashchynski <[email protected]>2020-10-14 15:23:22 +0300
commita407be25f068a5c0a20a4cf96ddfaf4ccd3af739 (patch)
tree143e1ca6729d28b1d50e151a8d8050ac67e6f5ec /worker.go
parentb5f429667131ef91498d67d08242b9f46cc23d6d (diff)
Fixed: race conditions in tests, Handle_Dead test activated
Diffstat (limited to 'worker.go')
-rw-r--r--worker.go77
1 files changed, 30 insertions, 47 deletions
diff --git a/worker.go b/worker.go
index 855a9958..82bd99df 100644
--- a/worker.go
+++ b/worker.go
@@ -18,7 +18,7 @@ import (
// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
const (
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError int64 = iota + 100
+ EventWorkerError int64 = iota + 200
// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
EventWorkerLog
@@ -200,49 +200,37 @@ func (w *WorkerProcess) Events() <-chan WorkerEvent {
// will be wrapped as WorkerError. Method will return error code if php process fails
// to find or Start the script.
func (w *WorkerProcess) Wait(ctx context.Context) error {
- c := make(chan error)
- go func() {
- err := multierr.Combine(w.cmd.Wait())
- w.endState = w.cmd.ProcessState
- if err != nil {
- w.state.Set(StateErrored)
- // if there are messages in the events channel, read it
- // TODO potentially danger place
- if len(w.events) > 0 {
- select {
- case ev := <-w.events:
- err = multierr.Append(err, errors.New(string(ev.Payload.([]byte))))
- }
- }
- // if no errors in the events, error might be in the errbuffer
- if w.errBuffer.Len() > 0 {
- err = multierr.Append(err, errors.New(w.errBuffer.String()))
+ err := multierr.Combine(w.cmd.Wait())
+ w.endState = w.cmd.ProcessState
+ if err != nil {
+ w.state.Set(StateErrored)
+ // if there are messages in the events channel, read it
+ // TODO potentially danger place
+ if len(w.events) > 0 {
+ select {
+ case ev := <-w.events:
+ err = multierr.Append(err, errors.New(string(ev.Payload.([]byte))))
}
-
- c <- multierr.Append(err, w.closeRelay())
- return
}
-
- err = multierr.Append(err, w.closeRelay())
- if err != nil {
- w.state.Set(StateErrored)
- c <- err
- return
+ // if no errors in the events, error might be in the errbuffer
+ if w.errBuffer.Len() > 0 {
+ err = multierr.Append(err, errors.New(w.errBuffer.String()))
}
- if w.endState.Success() {
- w.state.Set(StateStopped)
- }
- c <- nil
- }()
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-c:
- return err
- }
+ return multierr.Append(err, w.closeRelay())
+ }
+
+ err = multierr.Append(err, w.closeRelay())
+ if err != nil {
+ w.state.Set(StateErrored)
+ return err
}
+
+ if w.endState.Success() {
+ w.state.Set(StateStopped)
+ }
+
+ return nil
}
func (w *WorkerProcess) closeRelay() error {
@@ -259,20 +247,15 @@ func (w *WorkerProcess) closeRelay() error {
func (w *WorkerProcess) Stop(ctx context.Context) error {
c := make(chan error)
go func() {
- var errs []string
+ var err error
w.errBuffer.Close()
w.state.Set(StateStopping)
w.mu.Lock()
defer w.mu.Unlock()
- err := sendControl(w.relay, &stopCommand{Stop: true})
+ err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true}))
if err != nil {
- errs = append(errs, err.Error())
w.state.Set(StateKilling)
- err = w.cmd.Process.Kill()
- if err != nil {
- errs = append(errs, err.Error())
- }
- c <- errors.New(strings.Join(errs, "|"))
+ c <- multierr.Append(err, w.cmd.Process.Kill())
}
w.state.Set(StateStopped)
c <- nil