From a407be25f068a5c0a20a4cf96ddfaf4ccd3af739 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 14 Oct 2020 15:23:22 +0300 Subject: Fixed: race conditions in tests, Handle_Dead test activated --- worker.go | 77 +++++++++++++++++++++++++-------------------------------------- 1 file changed, 30 insertions(+), 47 deletions(-) (limited to 'worker.go') diff --git a/worker.go b/worker.go index 855a9958..82bd99df 100644 --- a/worker.go +++ b/worker.go @@ -18,7 +18,7 @@ import ( // EventWorkerKill thrown after WorkerProcess is being forcefully killed. const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError int64 = iota + 100 + EventWorkerError int64 = iota + 200 // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog @@ -200,49 +200,37 @@ func (w *WorkerProcess) Events() <-chan WorkerEvent { // will be wrapped as WorkerError. Method will return error code if php process fails // to find or Start the script. func (w *WorkerProcess) Wait(ctx context.Context) error { - c := make(chan error) - go func() { - err := multierr.Combine(w.cmd.Wait()) - w.endState = w.cmd.ProcessState - if err != nil { - w.state.Set(StateErrored) - // if there are messages in the events channel, read it - // TODO potentially danger place - if len(w.events) > 0 { - select { - case ev := <-w.events: - err = multierr.Append(err, errors.New(string(ev.Payload.([]byte)))) - } - } - // if no errors in the events, error might be in the errbuffer - if w.errBuffer.Len() > 0 { - err = multierr.Append(err, errors.New(w.errBuffer.String())) + err := multierr.Combine(w.cmd.Wait()) + w.endState = w.cmd.ProcessState + if err != nil { + w.state.Set(StateErrored) + // if there are messages in the events channel, read it + // TODO potentially danger place + if len(w.events) > 0 { + select { + case ev := <-w.events: + err = multierr.Append(err, errors.New(string(ev.Payload.([]byte)))) } - - c <- multierr.Append(err, w.closeRelay()) - return } - - err = multierr.Append(err, w.closeRelay()) - if err != nil { - w.state.Set(StateErrored) - c <- err - return + // if no errors in the events, error might be in the errbuffer + if w.errBuffer.Len() > 0 { + err = multierr.Append(err, errors.New(w.errBuffer.String())) } - if w.endState.Success() { - w.state.Set(StateStopped) - } - c <- nil - }() - for { - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-c: - return err - } + return multierr.Append(err, w.closeRelay()) + } + + err = multierr.Append(err, w.closeRelay()) + if err != nil { + w.state.Set(StateErrored) + return err } + + if w.endState.Success() { + w.state.Set(StateStopped) + } + + return nil } func (w *WorkerProcess) closeRelay() error { @@ -259,20 +247,15 @@ func (w *WorkerProcess) closeRelay() error { func (w *WorkerProcess) Stop(ctx context.Context) error { c := make(chan error) go func() { - var errs []string + var err error w.errBuffer.Close() w.state.Set(StateStopping) w.mu.Lock() defer w.mu.Unlock() - err := sendControl(w.relay, &stopCommand{Stop: true}) + err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true})) if err != nil { - errs = append(errs, err.Error()) w.state.Set(StateKilling) - err = w.cmd.Process.Kill() - if err != nil { - errs = append(errs, err.Error()) - } - c <- errors.New(strings.Join(errs, "|")) + c <- multierr.Append(err, w.cmd.Process.Kill()) } w.state.Set(StateStopped) c <- nil -- cgit v1.2.3