summaryrefslogtreecommitdiff
path: root/transport
diff options
context:
space:
mode:
Diffstat (limited to 'transport')
-rwxr-xr-xtransport/pipe/pipe_factory.go21
-rwxr-xr-xtransport/pipe/pipe_factory_test.go27
-rwxr-xr-xtransport/socket/socket_factory.go20
-rwxr-xr-xtransport/socket/socket_factory_test.go40
4 files changed, 87 insertions, 21 deletions
diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go
index 0d46f496..84a9d311 100755
--- a/transport/pipe/pipe_factory.go
+++ b/transport/pipe/pipe_factory.go
@@ -29,7 +29,7 @@ type sr struct {
// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
spCh := make(chan sr)
const op = errors.Op("factory_spawn_worker_with_timeout")
go func() {
@@ -90,7 +90,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
- pid, err := internal.FetchPID(relay)
+ // used as a ping
+ _, err = internal.Pid(relay)
if err != nil {
err = multierr.Combine(
err,
@@ -109,19 +110,6 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
- if pid != w.Pid() {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())),
- }:
- return
- default:
- _ = w.Kill()
- return
- }
- }
-
select {
case
// return worker
@@ -177,7 +165,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
// errors bundle
- if pid, err := internal.FetchPID(relay); pid != w.Pid() {
+ _, err = internal.Pid(relay)
+ if err != nil {
err = multierr.Combine(
err,
w.Kill(),
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index f8198610..b4ba8c87 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -179,6 +179,33 @@ func Test_Pipe_Echo(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
+func Test_Pipe_Echo_Script(t *testing.T) {
+ t.Parallel()
+ cmd := exec.Command("sh", "../../tests/pipes_test_script.sh")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
func Test_Pipe_Broken(t *testing.T) {
t.Parallel()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go
index d98ce607..39c04eac 100755
--- a/transport/socket/socket_factory.go
+++ b/transport/socket/socket_factory.go
@@ -66,7 +66,7 @@ func (f *Factory) listen() error {
}
rl := socket.NewSocketRelay(conn)
- pid, err := internal.FetchPID(rl)
+ pid, err := internal.Pid(rl)
if err != nil {
return err
}
@@ -189,7 +189,8 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
w.AttachRelay(rl)
// errors bundle
- if pid, err := internal.FetchPID(rl); pid != w.Pid() {
+ _, err = internal.Pid(rl)
+ if err != nil {
err = multierr.Combine(
err,
w.Kill(),
@@ -222,11 +223,20 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess
return nil, err
}
default:
- tmp, ok := f.relays.LoadAndDelete(w.Pid())
- if !ok {
+ // find first pid and attach relay to it
+ var r *socket.Relay
+ f.relays.Range(func(k, val interface{}) bool {
+ r = val.(*socket.Relay)
+ f.relays.Delete(k)
+ return false
+ })
+
+ // no relay exists
+ if r == nil {
continue
}
- return tmp.(*socket.Relay), nil
+
+ return r, nil
}
}
}
diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go
index 879dba8e..d517d026 100755
--- a/transport/socket/socket_factory_test.go
+++ b/transport/socket/socket_factory_test.go
@@ -282,6 +282,46 @@ func Test_Tcp_Echo(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
+func Test_Tcp_Echo_Script(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err = ls.Close()
+ if err != nil {
+ t.Errorf("error closing the listener: error %v", err)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("sh", "../../tests/socket_test_script.sh")
+
+ w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait())
+ }()
+ defer func() {
+ err = w.Stop()
+ if err != nil {
+ t.Errorf("error stopping the Process: error %v", err)
+ }
+ }()
+
+ sw := worker.From(w)
+
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
func Test_Unix_Start(t *testing.T) {
ctx := context.Background()
ls, err := net.Listen("unix", "sock.unix")