summaryrefslogtreecommitdiff
path: root/pkg/socket
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-20 18:28:46 +0300
committerValery Piashchynski <[email protected]>2020-12-20 18:28:46 +0300
commitf4a36c7f684216fb408693a6c494486144df57cf (patch)
treee1b61bf7e74cb63aa45f9ca0284a4cffe8e06b0e /pkg/socket
parentfbd5adde5abae6f7adb7fcdafc226bcd3480d498 (diff)
parenta10d20d20e910ed8fcfbc3bc690aaf17ee338ff3 (diff)
Merge remote-tracking branch 'origin/2.0' into plugin/redis
# Conflicts: # go.sum # pkg/pipe/pipe_factory_test.go # pkg/pool/static_pool.go # plugins/rpc/plugin.go
Diffstat (limited to 'pkg/socket')
-rwxr-xr-xpkg/socket/socket_factory.go15
-rwxr-xr-xpkg/socket/socket_factory_test.go14
2 files changed, 15 insertions, 14 deletions
diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go
index f721ad66..b08d24e4 100755
--- a/pkg/socket/socket_factory.go
+++ b/pkg/socket/socket_factory.go
@@ -9,11 +9,12 @@ import (
"github.com/shirou/gopsutil/process"
"github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/socket"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/goridge/v3"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
)
@@ -65,7 +66,7 @@ func (f *Factory) listen() error {
return err
}
- rl := goridge.NewSocketRelay(conn)
+ rl := socket.NewSocketRelay(conn)
pid, err := internal.FetchPID(rl)
if err != nil {
return err
@@ -178,7 +179,7 @@ func (f *Factory) Close(ctx context.Context) error {
}
// waits for Process to connect over socket and returns associated relay of timeout
-func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*goridge.SocketRelay, error) {
+func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) {
ticker := time.NewTicker(time.Millisecond * 100)
for {
select {
@@ -194,12 +195,12 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess
if !ok {
continue
}
- return tmp.(*goridge.SocketRelay), nil
+ return tmp.(*socket.Relay), nil
}
}
}
-func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) {
+func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) {
const op = errors.Op("find_relay")
// poll every 1ms for the relay
pollDone := time.NewTimer(f.tout)
@@ -212,13 +213,13 @@ func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error)
if !ok {
continue
}
- return tmp.(*goridge.SocketRelay), nil
+ return tmp.(*socket.Relay), nil
}
}
}
// chan to store relay associated with specific pid
-func (f *Factory) attachRelayToPid(pid int64, relay goridge.Relay) {
+func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) {
f.relays.Store(pid, relay)
}
diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go
index f1a7d637..6a88713a 100755
--- a/pkg/socket/socket_factory_test.go
+++ b/pkg/socket/socket_factory_test.go
@@ -8,7 +8,7 @@ import (
"testing"
"time"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -211,7 +211,7 @@ func Test_Tcp_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -251,7 +251,7 @@ func Test_Tcp_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -396,7 +396,7 @@ func Test_Unix_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
@@ -439,7 +439,7 @@ func Test_Unix_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(internal.Payload{Body: []byte("hello")})
+ res, err := sw.Exec(payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -515,7 +515,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -583,7 +583,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}