summaryrefslogtreecommitdiff
path: root/pkg/transport
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-03-28 14:00:54 +0300
committerValery Piashchynski <[email protected]>2021-03-28 14:00:54 +0300
commit2a58b1be2c79f2fe10c0a429878937661645a928 (patch)
treef3a7cd472c75c4dd2a97bcf97cb154731ed81230 /pkg/transport
parent970014530a23d57a3be41c6369ac6456d0b36ae1 (diff)
- Fix bug with the worker reallocating during the response
- Update .golangci and fix new warnings Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/transport')
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go2
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go2
-rwxr-xr-xpkg/transport/socket/socket_factory.go14
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go56
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go42
5 files changed, 56 insertions, 60 deletions
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index a00b2117..51befb1e 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -281,7 +281,7 @@ func Test_Echo2(t *testing.T) {
assert.NoError(t, sw.Wait())
}()
defer func() {
- err := sw.Stop()
+ err = sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index fb40ecb0..3ef65be8 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -299,7 +299,7 @@ func Test_Echo(t *testing.T) {
assert.NoError(t, sw.Wait())
}()
defer func() {
- err := sw.Stop()
+ err = sw.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index f58f9561..990eb384 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -88,7 +88,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
const op = errors.Op("factory_spawn_worker_with_timeout")
c := make(chan socketSpawn)
go func() {
- ctx, cancel := context.WithTimeout(ctx, f.tout)
+ ctxT, cancel := context.WithTimeout(ctx, f.tout)
defer cancel()
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
@@ -108,7 +108,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
return
}
- rl, err := f.findRelayWithContext(ctx, w)
+ rl, err := f.findRelayWithContext(ctxT, w)
if err != nil {
err = multierr.Combine(
err,
@@ -179,18 +179,19 @@ func (f *Factory) Close() error {
// waits for Process to connect over socket and returns associated relay of timeout
func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) {
- ticker := time.NewTicker(time.Millisecond * 100)
+ ticker := time.NewTicker(time.Millisecond * 10)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
+ // check for the process exists
_, err := process.NewProcess(int32(w.Pid()))
if err != nil {
return nil, err
}
default:
- tmp, ok := f.relays.Load(w.Pid())
+ tmp, ok := f.relays.LoadAndDelete(w.Pid())
if !ok {
continue
}
@@ -221,8 +222,3 @@ func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) {
func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) {
f.relays.Store(pid, relay)
}
-
-// deletes relay chan associated with specific pid
-func (f *Factory) removeRelayFromPid(pid int64) {
- f.relays.Delete(pid)
-}
diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
index 1361693b..b875e2c8 100644
--- a/pkg/transport/socket/socket_factory_spawn_test.go
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -19,8 +19,8 @@ func Test_Tcp_Start2(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -55,7 +55,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) {
f := NewSocketServer(ls, time.Minute)
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -75,8 +75,8 @@ func Test_Tcp_StartError2(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -131,8 +131,8 @@ func Test_Tcp_Invalid2(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -151,8 +151,8 @@ func Test_Tcp_Broken2(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -181,8 +181,8 @@ func Test_Tcp_Broken2(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
- err := w.Wait()
- assert.Error(t, err)
+ errW := w.Wait()
+ assert.Error(t, errW)
}()
defer func() {
@@ -206,8 +206,8 @@ func Test_Tcp_Echo2(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -244,7 +244,7 @@ func Test_Unix_Start2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(t, err)
}()
@@ -268,7 +268,7 @@ func Test_Unix_Failboot2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(t, err)
}()
@@ -295,7 +295,7 @@ func Test_Unix_Timeout2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(t, err)
}()
@@ -311,7 +311,7 @@ func Test_Unix_Invalid2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(t, err)
}()
@@ -326,8 +326,8 @@ func Test_Unix_Broken2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
- assert.NoError(t, err)
+ errC := ls.Close()
+ assert.NoError(t, errC)
}()
cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
@@ -351,8 +351,8 @@ func Test_Unix_Broken2(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
- err := w.Wait()
- assert.Error(t, err)
+ errW := w.Wait()
+ assert.Error(t, errW)
}()
defer func() {
@@ -376,7 +376,7 @@ func Test_Unix_Echo2(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(t, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(t, err)
}()
@@ -412,7 +412,7 @@ func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(b, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(b, err)
}()
@@ -439,7 +439,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
assert.NoError(b, err)
defer func() {
- err := ls.Close()
+ err = ls.Close()
assert.NoError(b, err)
}()
@@ -472,8 +472,8 @@ func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
b.Errorf("error closing the listener: error %v", err)
}
}()
@@ -503,8 +503,8 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
b.Errorf("error closing the listener: error %v", err)
}
}()
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index a8dd0fe0..34fe088b 100755
--- a/pkg/transport/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -22,7 +22,7 @@ func Test_Tcp_Start(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -60,7 +60,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
f := NewSocketServer(ls, time.Minute)
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -82,7 +82,7 @@ func Test_Tcp_StartError(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -143,7 +143,7 @@ func Test_Tcp_Timeout(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -166,7 +166,7 @@ func Test_Tcp_Invalid(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -188,8 +188,8 @@ func Test_Tcp_Broken(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -218,8 +218,8 @@ func Test_Tcp_Broken(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
- err := w.Wait()
- assert.Error(t, err)
+ errW := w.Wait()
+ assert.Error(t, errW)
}()
defer func() {
@@ -245,7 +245,7 @@ func Test_Tcp_Echo(t *testing.T) {
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -284,7 +284,7 @@ func Test_Unix_Start(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -314,7 +314,7 @@ func Test_Unix_Failboot(t *testing.T) {
ctx := context.Background()
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -347,7 +347,7 @@ func Test_Unix_Timeout(t *testing.T) {
ctx := context.Background()
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -369,7 +369,7 @@ func Test_Unix_Invalid(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -390,8 +390,8 @@ func Test_Unix_Broken(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
- if err != nil {
+ errC := ls.Close()
+ if errC != nil {
t.Errorf("error closing the listener: error %v", err)
}
}()
@@ -422,8 +422,8 @@ func Test_Unix_Broken(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
- err := w.Wait()
- assert.Error(t, err)
+ errW := w.Wait()
+ assert.Error(t, errW)
}()
defer func() {
@@ -448,7 +448,7 @@ func Test_Unix_Echo(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
t.Errorf("error closing the listener: error %v", err)
}
@@ -559,7 +559,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
b.Errorf("error closing the listener: error %v", err)
}
@@ -588,7 +588,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
b.Errorf("error closing the listener: error %v", err)
}