diff options
Diffstat (limited to 'transport')
-rwxr-xr-x | transport/pipe/pipe_factory.go | 21 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory_test.go | 27 | ||||
-rwxr-xr-x | transport/socket/socket_factory.go | 20 | ||||
-rwxr-xr-x | transport/socket/socket_factory_test.go | 40 |
4 files changed, 87 insertions, 21 deletions
diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go index 0d46f496..84a9d311 100755 --- a/transport/pipe/pipe_factory.go +++ b/transport/pipe/pipe_factory.go @@ -29,7 +29,7 @@ type sr struct { // SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { spCh := make(chan sr) const op = errors.Op("factory_spawn_worker_with_timeout") go func() { @@ -90,7 +90,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } - pid, err := internal.FetchPID(relay) + // used as a ping + _, err = internal.Pid(relay) if err != nil { err = multierr.Combine( err, @@ -109,19 +110,6 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } - if pid != w.Pid() { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())), - }: - return - default: - _ = w.Kill() - return - } - } - select { case // return worker @@ -177,7 +165,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } // errors bundle - if pid, err := internal.FetchPID(relay); pid != w.Pid() { + _, err = internal.Pid(relay) + if err != nil { err = multierr.Combine( err, w.Kill(), diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index f8198610..b4ba8c87 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -179,6 +179,33 @@ func Test_Pipe_Echo(t *testing.T) { assert.Equal(t, "hello", res.String()) } +func Test_Pipe_Echo_Script(t *testing.T) { + t.Parallel() + cmd := exec.Command("sh", "../../tests/pipes_test_script.sh") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + func Test_Pipe_Broken(t *testing.T) { t.Parallel() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go index d98ce607..39c04eac 100755 --- a/transport/socket/socket_factory.go +++ b/transport/socket/socket_factory.go @@ -66,7 +66,7 @@ func (f *Factory) listen() error { } rl := socket.NewSocketRelay(conn) - pid, err := internal.FetchPID(rl) + pid, err := internal.Pid(rl) if err != nil { return err } @@ -189,7 +189,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor w.AttachRelay(rl) // errors bundle - if pid, err := internal.FetchPID(rl); pid != w.Pid() { + _, err = internal.Pid(rl) + if err != nil { err = multierr.Combine( err, w.Kill(), @@ -222,11 +223,20 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess return nil, err } default: - tmp, ok := f.relays.LoadAndDelete(w.Pid()) - if !ok { + // find first pid and attach relay to it + var r *socket.Relay + f.relays.Range(func(k, val interface{}) bool { + r = val.(*socket.Relay) + f.relays.Delete(k) + return false + }) + + // no relay exists + if r == nil { continue } - return tmp.(*socket.Relay), nil + + return r, nil } } } diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go index 879dba8e..d517d026 100755 --- a/transport/socket/socket_factory_test.go +++ b/transport/socket/socket_factory_test.go @@ -282,6 +282,46 @@ func Test_Tcp_Echo(t *testing.T) { assert.Equal(t, "hello", res.String()) } +func Test_Tcp_Echo_Script(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("sh", "../../tests/socket_test_script.sh") + + w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + func Test_Unix_Start(t *testing.T) { ctx := context.Background() ls, err := net.Listen("unix", "sock.unix") |