diff options
author | Valery Piashchynski <[email protected]> | 2021-08-12 15:38:19 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-08-12 15:38:19 +0300 |
commit | df27287c78d7b17d7c8f0e7fff59fa7cbf2a4f9f (patch) | |
tree | df0749155487eae6bcdbb2456885131a21916f4d /pkg/transport/pipe | |
parent | 67db4b5f7b66e9a32713133baed83c3ab7146bb8 (diff) | |
parent | ecbfc5c5265a9895f4e371ce4388f64df8714e63 (diff) |
#726: feat(plugin): new `jobs` plugin
#726: feat(plugin): new `jobs` plugin
Diffstat (limited to 'pkg/transport/pipe')
-rwxr-xr-x | pkg/transport/pipe/pipe_factory.go | 77 | ||||
-rw-r--r-- | pkg/transport/pipe/pipe_factory_spawn_test.go | 38 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory_test.go | 39 |
3 files changed, 93 insertions, 61 deletions
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go index 19f4f92d..9433a510 100755 --- a/pkg/transport/pipe/pipe_factory.go +++ b/pkg/transport/pipe/pipe_factory.go @@ -22,42 +22,54 @@ func NewPipeFactory() *Factory { return &Factory{} } -type SpawnResult struct { +type sr struct { w *worker.Process err error } // SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - c := make(chan SpawnResult) +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit + spCh := make(chan sr) const op = errors.Op("factory_spawn_worker_with_timeout") go func() { w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } in, err := cmd.StdoutPipe() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } out, err := cmd.StdinPipe() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } // Init new PIPE relay @@ -67,42 +79,69 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis // Start the worker err = w.Start() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } - // errors bundle pid, err := internal.FetchPID(relay) - if pid != w.Pid() || err != nil { + if err != nil { err = multierr.Combine( err, w.Kill(), w.Wait(), ) - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + _ = w.Kill() + return } - return } - // everything ok, set ready state - w.State().Set(worker.StateReady) + if pid != w.Pid() { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())), + }: + return + default: + _ = w.Kill() + return + } + } + select { + case // return worker - c <- SpawnResult{ + spCh <- sr{ w: w, err: nil, + }: + // everything ok, set ready state + w.State().Set(worker.StateReady) + return + default: + _ = w.Kill() + return } }() select { case <-ctx.Done(): return nil, ctx.Err() - case res := <-c: + case res := <-spCh: if res.err != nil { return nil, res.err } diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index 51befb1e..f5e9669b 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -144,7 +144,7 @@ func Test_Pipe_Echo2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -168,11 +168,10 @@ func Test_Pipe_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) } func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { @@ -215,7 +214,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -238,7 +237,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -261,7 +260,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -287,7 +286,7 @@ func Test_Echo2(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -314,11 +313,10 @@ func Test_BadPayload2(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{}) + res, err := sw.Exec(&payload.Payload{}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) assert.Contains(t, err.Error(), "payload can not be empty") } @@ -358,7 +356,7 @@ func Test_Echo_Slow2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -387,10 +385,9 @@ func Test_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) time.Sleep(time.Second * 3) mu.Lock() @@ -418,10 +415,9 @@ func Test_Error2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.ErrSoftJob") @@ -445,19 +441,19 @@ func Test_NumExecs2(t *testing.T) { sw := worker.From(w) - _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(1), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(2), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index 3ef65be8..d243a93f 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -102,6 +102,7 @@ func Test_Pipe_PipeError(t *testing.T) { func Test_Pipe_PipeError2(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + // error cause _, err := cmd.StdinPipe() if err != nil { t.Errorf("error creating the STDIN pipe: error %v", err) @@ -159,7 +160,7 @@ func Test_Pipe_Echo(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -184,11 +185,10 @@ func Test_Pipe_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) } func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { @@ -231,7 +231,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -255,7 +255,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -279,7 +279,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -305,7 +305,7 @@ func Test_Echo(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -333,11 +333,10 @@ func Test_BadPayload(t *testing.T) { } }() - res, err := sw.Exec(payload.Payload{}) + res, err := sw.Exec(&payload.Payload{}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) assert.Contains(t, err.Error(), "payload can not be empty") } @@ -379,7 +378,7 @@ func Test_Echo_Slow(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -409,10 +408,9 @@ func Test_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) time.Sleep(time.Second * 3) mu.Lock() @@ -441,10 +439,9 @@ func Test_Error(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) if errors.Is(errors.SoftJob, err) == false { t.Fatal("error should be of type errors.ErrSoftJob") @@ -469,19 +466,19 @@ func Test_NumExecs(t *testing.T) { sw := worker.From(w) - _, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(1), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(2), w.State().NumExecs()) - _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } |