diff options
Diffstat (limited to 'pkg/transport/socket')
-rwxr-xr-x | pkg/transport/socket/socket_factory.go | 57 | ||||
-rw-r--r-- | pkg/transport/socket/socket_factory_spawn_test.go | 32 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory_test.go | 38 |
3 files changed, 78 insertions, 49 deletions
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index 965a0f30..dc2b75cf 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -2,6 +2,7 @@ package socket import ( "context" + "fmt" "net" "os/exec" "sync" @@ -29,8 +30,6 @@ type Factory struct { // sockets which are waiting for process association relays sync.Map - - ErrCh chan error } // NewSocketServer returns Factory attached to a given socket listener. @@ -40,14 +39,17 @@ func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { ls: ls, tout: tout, relays: sync.Map{}, - ErrCh: make(chan error, 10), } // Be careful // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go // https://github.com/golang/go/issues/5045 go func() { - f.ErrCh <- f.listen() + err := f.listen() + // there is no logger here, use fmt + if err != nil { + fmt.Printf("[WARN]: socket server listen, error: %v\n", err) + } }() return f @@ -90,20 +92,28 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis defer cancel() w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: nil, - err: err, + err: errors.E(op, err), + }: + return + default: + return } - return } err = w.Start() if err != nil { - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } rl, err := f.findRelayWithContext(ctxT, w) @@ -114,19 +124,31 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis w.Wait(), ) - c <- socketSpawn{ + select { + // try to write result + case c <- socketSpawn{ w: nil, err: errors.E(op, err), + }: + return + // if no receivers - return + default: + return } - return } w.AttachRelay(rl) w.State().Set(worker.StateReady) - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: w, err: nil, + }: + return + default: + _ = w.Kill() + return } }() @@ -165,6 +187,17 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } w.AttachRelay(rl) + + // errors bundle + if pid, err := internal.FetchPID(rl); pid != w.Pid() { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, errors.E(op, err) + } + w.State().Set(worker.StateReady) return w, nil diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go index b875e2c8..905a3b6b 100644 --- a/pkg/transport/socket/socket_factory_spawn_test.go +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -16,7 +16,7 @@ import ( ) func Test_Tcp_Start2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -45,7 +45,7 @@ func Test_Tcp_Start2(t *testing.T) { } func Test_Tcp_StartCloseFactory2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { } else { t.Skip("socket is busy") @@ -72,7 +72,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) { } func Test_Tcp_StartError2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -96,7 +96,7 @@ func Test_Tcp_StartError2(t *testing.T) { } func Test_Tcp_Failboot2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err3 := ls.Close() @@ -128,7 +128,7 @@ func Test_Tcp_Failboot2(t *testing.T) { } func Test_Tcp_Invalid2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -148,7 +148,7 @@ func Test_Tcp_Invalid2(t *testing.T) { } func Test_Tcp_Broken2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -194,16 +194,15 @@ func Test_Tcp_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) wg.Wait() <-finish } func Test_Tcp_Echo2(t *testing.T) { - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -230,7 +229,7 @@ func Test_Tcp_Echo2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -363,11 +362,10 @@ func Test_Unix_Broken2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) wg.Wait() <-finish } @@ -398,7 +396,7 @@ func Test_Unix_Echo2(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -459,7 +457,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -528,7 +526,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index 34fe088b..f9bb2178 100755 --- a/pkg/transport/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -19,7 +19,7 @@ func Test_Tcp_Start(t *testing.T) { ctx := context.Background() time.Sleep(time.Millisecond * 10) // to ensure free socket - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err = ls.Close() @@ -50,7 +50,7 @@ func Test_Tcp_Start(t *testing.T) { func Test_Tcp_StartCloseFactory(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { } else { t.Skip("socket is busy") @@ -79,7 +79,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { func Test_Tcp_StartError(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err = ls.Close() @@ -106,7 +106,7 @@ func Test_Tcp_Failboot(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err3 := ls.Close() @@ -140,7 +140,7 @@ func Test_Tcp_Failboot(t *testing.T) { func Test_Tcp_Timeout(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err = ls.Close() @@ -163,7 +163,7 @@ func Test_Tcp_Timeout(t *testing.T) { func Test_Tcp_Invalid(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err = ls.Close() @@ -185,7 +185,7 @@ func Test_Tcp_Invalid(t *testing.T) { func Test_Tcp_Broken(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { errC := ls.Close() @@ -231,10 +231,9 @@ func Test_Tcp_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Body) - assert.Nil(t, res.Context) + assert.Nil(t, res) wg.Wait() <-finish } @@ -242,7 +241,7 @@ func Test_Tcp_Broken(t *testing.T) { func Test_Tcp_Echo(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { err = ls.Close() @@ -269,7 +268,7 @@ func Test_Tcp_Echo(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -434,11 +433,10 @@ func Test_Unix_Broken(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res.Context) - assert.Nil(t, res.Body) + assert.Nil(t, res) <-block wg.Wait() } @@ -475,7 +473,7 @@ func Test_Unix_Echo(t *testing.T) { sw := worker.From(w) - res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -487,7 +485,7 @@ func Test_Unix_Echo(t *testing.T) { func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if err == nil { defer func() { err = ls.Close() @@ -520,7 +518,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { ctx := context.Background() - ls, err := net.Listen("tcp", "localhost:9007") + ls, err := net.Listen("tcp", "127.0.0.1:9007") if err == nil { defer func() { err = ls.Close() @@ -548,7 +546,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -613,7 +611,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { sw := worker.From(w) for n := 0; n < b.N; n++ { - if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } |