diff options
author | Valery Piashchynski <[email protected]> | 2020-12-20 18:28:46 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-20 18:28:46 +0300 |
commit | f4a36c7f684216fb408693a6c494486144df57cf (patch) | |
tree | e1b61bf7e74cb63aa45f9ca0284a4cffe8e06b0e /pkg/socket | |
parent | fbd5adde5abae6f7adb7fcdafc226bcd3480d498 (diff) | |
parent | a10d20d20e910ed8fcfbc3bc690aaf17ee338ff3 (diff) |
Merge remote-tracking branch 'origin/2.0' into plugin/redis
# Conflicts:
# go.sum
# pkg/pipe/pipe_factory_test.go
# pkg/pool/static_pool.go
# plugins/rpc/plugin.go
Diffstat (limited to 'pkg/socket')
-rwxr-xr-x | pkg/socket/socket_factory.go | 15 | ||||
-rwxr-xr-x | pkg/socket/socket_factory_test.go | 14 |
2 files changed, 15 insertions, 14 deletions
diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go index f721ad66..b08d24e4 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -9,11 +9,12 @@ import ( "github.com/shirou/gopsutil/process" "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/socket" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/goridge/v3" "go.uber.org/multierr" "golang.org/x/sync/errgroup" ) @@ -65,7 +66,7 @@ func (f *Factory) listen() error { return err } - rl := goridge.NewSocketRelay(conn) + rl := socket.NewSocketRelay(conn) pid, err := internal.FetchPID(rl) if err != nil { return err @@ -178,7 +179,7 @@ func (f *Factory) Close(ctx context.Context) error { } // waits for Process to connect over socket and returns associated relay of timeout -func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*goridge.SocketRelay, error) { +func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { ticker := time.NewTicker(time.Millisecond * 100) for { select { @@ -194,12 +195,12 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess if !ok { continue } - return tmp.(*goridge.SocketRelay), nil + return tmp.(*socket.Relay), nil } } } -func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) { +func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { const op = errors.Op("find_relay") // poll every 1ms for the relay pollDone := time.NewTimer(f.tout) @@ -212,13 +213,13 @@ func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) if !ok { continue } - return tmp.(*goridge.SocketRelay), nil + return tmp.(*socket.Relay), nil } } } // chan to store relay associated with specific pid -func (f *Factory) attachRelayToPid(pid int64, relay goridge.Relay) { +func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { f.relays.Store(pid, relay) } diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go index f1a7d637..6a88713a 100755 --- a/pkg/socket/socket_factory_test.go +++ b/pkg/socket/socket_factory_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -211,7 +211,7 @@ func Test_Tcp_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -251,7 +251,7 @@ func Test_Tcp_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -396,7 +396,7 @@ func Test_Unix_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) @@ -439,7 +439,7 @@ func Test_Unix_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -515,7 +515,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -583,7 +583,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } |