summaryrefslogtreecommitdiff
path: root/pkg/transport/socket
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/transport/socket')
-rwxr-xr-xpkg/transport/socket/socket_factory.go57
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go32
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go38
3 files changed, 78 insertions, 49 deletions
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index 965a0f30..dc2b75cf 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -2,6 +2,7 @@ package socket
import (
"context"
+ "fmt"
"net"
"os/exec"
"sync"
@@ -29,8 +30,6 @@ type Factory struct {
// sockets which are waiting for process association
relays sync.Map
-
- ErrCh chan error
}
// NewSocketServer returns Factory attached to a given socket listener.
@@ -40,14 +39,17 @@ func NewSocketServer(ls net.Listener, tout time.Duration) *Factory {
ls: ls,
tout: tout,
relays: sync.Map{},
- ErrCh: make(chan error, 10),
}
// Be careful
// https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go
// https://github.com/golang/go/issues/5045
go func() {
- f.ErrCh <- f.listen()
+ err := f.listen()
+ // there is no logger here, use fmt
+ if err != nil {
+ fmt.Printf("[WARN]: socket server listen, error: %v\n", err)
+ }
}()
return f
@@ -90,20 +92,28 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
defer cancel()
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: nil,
- err: err,
+ err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
err = w.Start()
if err != nil {
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
rl, err := f.findRelayWithContext(ctxT, w)
@@ -114,19 +124,31 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
w.Wait(),
)
- c <- socketSpawn{
+ select {
+ // try to write result
+ case c <- socketSpawn{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ // if no receivers - return
+ default:
+ return
}
- return
}
w.AttachRelay(rl)
w.State().Set(worker.StateReady)
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: w,
err: nil,
+ }:
+ return
+ default:
+ _ = w.Kill()
+ return
}
}()
@@ -165,6 +187,17 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
w.AttachRelay(rl)
+
+ // errors bundle
+ if pid, err := internal.FetchPID(rl); pid != w.Pid() {
+ err = multierr.Combine(
+ err,
+ w.Kill(),
+ w.Wait(),
+ )
+ return nil, errors.E(op, err)
+ }
+
w.State().Set(worker.StateReady)
return w, nil
diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
index b875e2c8..905a3b6b 100644
--- a/pkg/transport/socket/socket_factory_spawn_test.go
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -16,7 +16,7 @@ import (
)
func Test_Tcp_Start2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -45,7 +45,7 @@ func Test_Tcp_Start2(t *testing.T) {
}
func Test_Tcp_StartCloseFactory2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
} else {
t.Skip("socket is busy")
@@ -72,7 +72,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) {
}
func Test_Tcp_StartError2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -96,7 +96,7 @@ func Test_Tcp_StartError2(t *testing.T) {
}
func Test_Tcp_Failboot2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err3 := ls.Close()
@@ -128,7 +128,7 @@ func Test_Tcp_Failboot2(t *testing.T) {
}
func Test_Tcp_Invalid2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -148,7 +148,7 @@ func Test_Tcp_Invalid2(t *testing.T) {
}
func Test_Tcp_Broken2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -194,16 +194,15 @@ func Test_Tcp_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
func Test_Tcp_Echo2(t *testing.T) {
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -230,7 +229,7 @@ func Test_Tcp_Echo2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -363,11 +362,10 @@ func Test_Unix_Broken2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
@@ -398,7 +396,7 @@ func Test_Unix_Echo2(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -459,7 +457,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -528,7 +526,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index 34fe088b..f9bb2178 100755
--- a/pkg/transport/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -19,7 +19,7 @@ func Test_Tcp_Start(t *testing.T) {
ctx := context.Background()
time.Sleep(time.Millisecond * 10) // to ensure free socket
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err = ls.Close()
@@ -50,7 +50,7 @@ func Test_Tcp_Start(t *testing.T) {
func Test_Tcp_StartCloseFactory(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
} else {
t.Skip("socket is busy")
@@ -79,7 +79,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
func Test_Tcp_StartError(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err = ls.Close()
@@ -106,7 +106,7 @@ func Test_Tcp_Failboot(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err3 := ls.Close()
@@ -140,7 +140,7 @@ func Test_Tcp_Failboot(t *testing.T) {
func Test_Tcp_Timeout(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err = ls.Close()
@@ -163,7 +163,7 @@ func Test_Tcp_Timeout(t *testing.T) {
func Test_Tcp_Invalid(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err = ls.Close()
@@ -185,7 +185,7 @@ func Test_Tcp_Invalid(t *testing.T) {
func Test_Tcp_Broken(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
errC := ls.Close()
@@ -231,10 +231,9 @@ func Test_Tcp_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Body)
- assert.Nil(t, res.Context)
+ assert.Nil(t, res)
wg.Wait()
<-finish
}
@@ -242,7 +241,7 @@ func Test_Tcp_Broken(t *testing.T) {
func Test_Tcp_Echo(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
defer func() {
err = ls.Close()
@@ -269,7 +268,7 @@ func Test_Tcp_Echo(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -434,11 +433,10 @@ func Test_Unix_Broken(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res.Context)
- assert.Nil(t, res.Body)
+ assert.Nil(t, res)
<-block
wg.Wait()
}
@@ -475,7 +473,7 @@ func Test_Unix_Echo(t *testing.T) {
sw := worker.From(w)
- res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -487,7 +485,7 @@ func Test_Unix_Echo(t *testing.T) {
func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if err == nil {
defer func() {
err = ls.Close()
@@ -520,7 +518,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
ctx := context.Background()
- ls, err := net.Listen("tcp", "localhost:9007")
+ ls, err := net.Listen("tcp", "127.0.0.1:9007")
if err == nil {
defer func() {
err = ls.Close()
@@ -548,7 +546,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -613,7 +611,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
sw := worker.From(w)
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}