diff options
author | Valery Piashchynski <[email protected]> | 2021-03-28 14:00:54 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-03-28 14:00:54 +0300 |
commit | 2a58b1be2c79f2fe10c0a429878937661645a928 (patch) | |
tree | f3a7cd472c75c4dd2a97bcf97cb154731ed81230 /pkg/transport | |
parent | 970014530a23d57a3be41c6369ac6456d0b36ae1 (diff) |
- Fix bug with the worker reallocating during the response
- Update .golangci and fix new warnings
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/transport')
-rw-r--r-- | pkg/transport/pipe/pipe_factory_spawn_test.go | 2 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory_test.go | 2 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory.go | 14 | ||||
-rw-r--r-- | pkg/transport/socket/socket_factory_spawn_test.go | 56 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory_test.go | 42 |
5 files changed, 56 insertions, 60 deletions
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index a00b2117..51befb1e 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -281,7 +281,7 @@ func Test_Echo2(t *testing.T) { assert.NoError(t, sw.Wait()) }() defer func() { - err := sw.Stop() + err = sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index fb40ecb0..3ef65be8 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -299,7 +299,7 @@ func Test_Echo(t *testing.T) { assert.NoError(t, sw.Wait()) }() defer func() { - err := sw.Stop() + err = sw.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index f58f9561..990eb384 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -88,7 +88,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis const op = errors.Op("factory_spawn_worker_with_timeout") c := make(chan socketSpawn) go func() { - ctx, cancel := context.WithTimeout(ctx, f.tout) + ctxT, cancel := context.WithTimeout(ctx, f.tout) defer cancel() w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { @@ -108,7 +108,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis return } - rl, err := f.findRelayWithContext(ctx, w) + rl, err := f.findRelayWithContext(ctxT, w) if err != nil { err = multierr.Combine( err, @@ -179,18 +179,19 @@ func (f *Factory) Close() error { // waits for Process to connect over socket and returns associated relay of timeout func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { - ticker := time.NewTicker(time.Millisecond * 100) + ticker := time.NewTicker(time.Millisecond * 10) for { select { case <-ctx.Done(): return nil, ctx.Err() case <-ticker.C: + // check for the process exists _, err := process.NewProcess(int32(w.Pid())) if err != nil { return nil, err } default: - tmp, ok := f.relays.Load(w.Pid()) + tmp, ok := f.relays.LoadAndDelete(w.Pid()) if !ok { continue } @@ -221,8 +222,3 @@ func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { f.relays.Store(pid, relay) } - -// deletes relay chan associated with specific pid -func (f *Factory) removeRelayFromPid(pid int64) { - f.relays.Delete(pid) -} diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go index 1361693b..b875e2c8 100644 --- a/pkg/transport/socket/socket_factory_spawn_test.go +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -19,8 +19,8 @@ func Test_Tcp_Start2(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -55,7 +55,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) { f := NewSocketServer(ls, time.Minute) defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -75,8 +75,8 @@ func Test_Tcp_StartError2(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -131,8 +131,8 @@ func Test_Tcp_Invalid2(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -151,8 +151,8 @@ func Test_Tcp_Broken2(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -181,8 +181,8 @@ func Test_Tcp_Broken2(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := w.Wait() - assert.Error(t, err) + errW := w.Wait() + assert.Error(t, errW) }() defer func() { @@ -206,8 +206,8 @@ func Test_Tcp_Echo2(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -244,7 +244,7 @@ func Test_Unix_Start2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(t, err) }() @@ -268,7 +268,7 @@ func Test_Unix_Failboot2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(t, err) }() @@ -295,7 +295,7 @@ func Test_Unix_Timeout2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(t, err) }() @@ -311,7 +311,7 @@ func Test_Unix_Invalid2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(t, err) }() @@ -326,8 +326,8 @@ func Test_Unix_Broken2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() - assert.NoError(t, err) + errC := ls.Close() + assert.NoError(t, errC) }() cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") @@ -351,8 +351,8 @@ func Test_Unix_Broken2(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := w.Wait() - assert.Error(t, err) + errW := w.Wait() + assert.Error(t, errW) }() defer func() { @@ -376,7 +376,7 @@ func Test_Unix_Echo2(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(t, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(t, err) }() @@ -412,7 +412,7 @@ func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(b, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(b, err) }() @@ -439,7 +439,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") assert.NoError(b, err) defer func() { - err := ls.Close() + err = ls.Close() assert.NoError(b, err) }() @@ -472,8 +472,8 @@ func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { b.Errorf("error closing the listener: error %v", err) } }() @@ -503,8 +503,8 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { b.Errorf("error closing the listener: error %v", err) } }() diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index a8dd0fe0..34fe088b 100755 --- a/pkg/transport/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -22,7 +22,7 @@ func Test_Tcp_Start(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -60,7 +60,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { f := NewSocketServer(ls, time.Minute) defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -82,7 +82,7 @@ func Test_Tcp_StartError(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -143,7 +143,7 @@ func Test_Tcp_Timeout(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -166,7 +166,7 @@ func Test_Tcp_Invalid(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -188,8 +188,8 @@ func Test_Tcp_Broken(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -218,8 +218,8 @@ func Test_Tcp_Broken(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := w.Wait() - assert.Error(t, err) + errW := w.Wait() + assert.Error(t, errW) }() defer func() { @@ -245,7 +245,7 @@ func Test_Tcp_Echo(t *testing.T) { ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -284,7 +284,7 @@ func Test_Unix_Start(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -314,7 +314,7 @@ func Test_Unix_Failboot(t *testing.T) { ctx := context.Background() if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -347,7 +347,7 @@ func Test_Unix_Timeout(t *testing.T) { ctx := context.Background() if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -369,7 +369,7 @@ func Test_Unix_Invalid(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -390,8 +390,8 @@ func Test_Unix_Broken(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() - if err != nil { + errC := ls.Close() + if errC != nil { t.Errorf("error closing the listener: error %v", err) } }() @@ -422,8 +422,8 @@ func Test_Unix_Broken(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err := w.Wait() - assert.Error(t, err) + errW := w.Wait() + assert.Error(t, errW) }() defer func() { @@ -448,7 +448,7 @@ func Test_Unix_Echo(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { t.Errorf("error closing the listener: error %v", err) } @@ -559,7 +559,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { b.Errorf("error closing the listener: error %v", err) } @@ -588,7 +588,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { b.Errorf("error closing the listener: error %v", err) } |