summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xgo.mod2
-rwxr-xr-xgo.sum4
-rw-r--r--interfaces/worker/worker.go6
-rwxr-xr-xinternal/protocol.go37
-rwxr-xr-xpkg/pipe/pipe_factory.go6
-rwxr-xr-xpkg/pipe/pipe_factory_test.go2
-rwxr-xr-xpkg/socket/socket_factory.go15
-rwxr-xr-xpkg/worker/sync_worker.go38
-rwxr-xr-xpkg/worker/worker.go8
-rw-r--r--plugins/checker/tests/plugin_test.go4
-rw-r--r--plugins/http/tests/http_test.go6
-rw-r--r--plugins/informer/tests/informer_test.go4
-rw-r--r--plugins/metrics/tests/metrics_test.go28
-rw-r--r--plugins/resetter/tests/resetter_test.go4
-rwxr-xr-xplugins/rpc/plugin.go6
-rw-r--r--plugins/rpc/tests/plugin2.go4
16 files changed, 90 insertions, 84 deletions
diff --git a/go.mod b/go.mod
index 9bd0307d..0dfb50b6 100755
--- a/go.mod
+++ b/go.mod
@@ -16,7 +16,7 @@ require (
github.com/spf13/viper v1.7.1
github.com/spiral/endure v1.0.0-beta20
github.com/spiral/errors v1.0.6
- github.com/spiral/goridge/v3 v3.0.0-beta7
+ github.com/spiral/goridge/v3 v3.0.0-beta8
github.com/stretchr/testify v1.6.1
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
github.com/yookoala/gofast v0.4.0
diff --git a/go.sum b/go.sum
index 83876b77..0e021f69 100755
--- a/go.sum
+++ b/go.sum
@@ -382,8 +382,12 @@ github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM=
github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.6 h1:berk5ShEILSw6DplUVv9Ea1wGdk2WlVKQpuvDngll0U=
github.com/spiral/errors v1.0.6/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
+github.com/spiral/goridge v1.0.4 h1:qnYtI84H0tcYjcbFdFl/VUFQZ0YUE9p+VuU8In4kC/8=
+github.com/spiral/goridge v2.1.4+incompatible h1:L15TKrbPEp/G6JfS3jjuvY6whkhfD292XX+1iy9mO2k=
github.com/spiral/goridge/v3 v3.0.0-beta7 h1:rJmfVFC/clN7XgsONcu185l36cPJ+MfcFkQSifQXFCM=
github.com/spiral/goridge/v3 v3.0.0-beta7/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE=
+github.com/spiral/goridge/v3 v3.0.0-beta8 h1:x8uXCdhY49U1LEvmehnTaD2El6J9ZHAefRdh/QIZ6A4=
+github.com/spiral/goridge/v3 v3.0.0-beta8/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE=
github.com/spiral/roadrunner v1.9.0 h1:hQRAqrpUCOujuuuY4dV5hQWjMhwvMnVZmK2mNON/yl4=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go
index 773dd044..f830fdf2 100644
--- a/interfaces/worker/worker.go
+++ b/interfaces/worker/worker.go
@@ -5,7 +5,7 @@ import (
"fmt"
"time"
- "github.com/spiral/goridge/v3"
+ "github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
@@ -47,10 +47,10 @@ type BaseProcess interface {
Kill() error
// Relay returns attached to worker goridge relay
- Relay() goridge.Relay
+ Relay() relay.Relay
// AttachRelay used to attach goridge relay to the worker process
- AttachRelay(rl goridge.Relay)
+ AttachRelay(rl relay.Relay)
}
type SyncWorker interface {
diff --git a/internal/protocol.go b/internal/protocol.go
index 5aa681eb..a099ce4d 100755
--- a/internal/protocol.go
+++ b/internal/protocol.go
@@ -5,7 +5,8 @@ import (
j "github.com/json-iterator/go"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3"
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/frame"
)
var json = j.ConfigCompatibleWithStandardLibrary
@@ -18,39 +19,39 @@ type pidCommand struct {
Pid int `json:"pid"`
}
-func SendControl(rl goridge.Relay, v interface{}) error {
+func SendControl(rl relay.Relay, payload interface{}) error {
const op = errors.Op("send control frame")
- frame := goridge.NewFrame()
- frame.WriteVersion(goridge.VERSION_1)
- frame.WriteFlags(goridge.CONTROL)
+ fr := frame.NewFrame()
+ fr.WriteVersion(frame.VERSION_1)
+ fr.WriteFlags(frame.CONTROL)
- if data, ok := v.([]byte); ok {
+ if data, ok := payload.([]byte); ok {
// check if payload no more that 4Gb
if uint32(len(data)) > ^uint32(0) {
return errors.E(op, errors.Str("payload is more that 4gb"))
}
- frame.WritePayloadLen(uint32(len(data)))
- frame.WritePayload(data)
- frame.WriteCRC()
+ fr.WritePayloadLen(uint32(len(data)))
+ fr.WritePayload(data)
+ fr.WriteCRC()
- err := rl.Send(frame)
+ err := rl.Send(fr)
if err != nil {
return errors.E(op, err)
}
return nil
}
- data, err := json.Marshal(v)
+ data, err := json.Marshal(payload)
if err != nil {
return errors.E(op, errors.Errorf("invalid payload: %s", err))
}
- frame.WritePayloadLen(uint32(len(data)))
- frame.WritePayload(data)
- frame.WriteCRC()
+ fr.WritePayloadLen(uint32(len(data)))
+ fr.WritePayload(data)
+ fr.WriteCRC()
- err = rl.Send(frame)
+ err = rl.Send(fr)
if err != nil {
return errors.E(op, err)
}
@@ -58,14 +59,14 @@ func SendControl(rl goridge.Relay, v interface{}) error {
return nil
}
-func FetchPID(rl goridge.Relay) (int64, error) {
+func FetchPID(rl relay.Relay) (int64, error) {
const op = errors.Op("fetchPID")
err := SendControl(rl, pidCommand{Pid: os.Getpid()})
if err != nil {
return 0, errors.E(op, err)
}
- frameR := goridge.NewFrame()
+ frameR := frame.NewFrame()
err = rl.Receive(frameR)
if !frameR.VerifyCRC() {
return 0, errors.E(op, errors.Str("CRC mismatch"))
@@ -79,7 +80,7 @@ func FetchPID(rl goridge.Relay) (int64, error) {
flags := frameR.ReadFlags()
- if flags&(byte(goridge.CONTROL)) == 0 {
+ if flags&(byte(frame.CONTROL)) == 0 {
return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag"))
}
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go
index c86d78c4..34735fe6 100755
--- a/pkg/pipe/pipe_factory.go
+++ b/pkg/pipe/pipe_factory.go
@@ -5,7 +5,7 @@ import (
"os/exec"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3"
+ "github.com/spiral/goridge/v3/pkg/pipe"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
@@ -65,7 +65,7 @@ func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (wo
}
// Init new PIPE relay
- relay := goridge.NewPipeRelay(in, out)
+ relay := pipe.NewPipeRelay(in, out)
w.AttachRelay(relay)
// Start the worker
@@ -134,7 +134,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
}
// Init new PIPE relay
- relay := goridge.NewPipeRelay(in, out)
+ relay := pipe.NewPipeRelay(in, out)
w.AttachRelay(relay)
// Start the worker
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index 9453de1d..40797747 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -469,7 +469,7 @@ func Test_Error(t *testing.T) {
if errors.Is(errors.ErrSoftJob, err) == false {
t.Fatal("error should be of type errors.ErrSoftJob")
}
- assert.Contains(t, err.Error(), "exec payload: SoftJobError: hello")
+ assert.Contains(t, err.Error(), "hello")
}
func Test_NumExecs(t *testing.T) {
diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go
index f721ad66..b08d24e4 100755
--- a/pkg/socket/socket_factory.go
+++ b/pkg/socket/socket_factory.go
@@ -9,11 +9,12 @@ import (
"github.com/shirou/gopsutil/process"
"github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/socket"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/goridge/v3"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
)
@@ -65,7 +66,7 @@ func (f *Factory) listen() error {
return err
}
- rl := goridge.NewSocketRelay(conn)
+ rl := socket.NewSocketRelay(conn)
pid, err := internal.FetchPID(rl)
if err != nil {
return err
@@ -178,7 +179,7 @@ func (f *Factory) Close(ctx context.Context) error {
}
// waits for Process to connect over socket and returns associated relay of timeout
-func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*goridge.SocketRelay, error) {
+func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) {
ticker := time.NewTicker(time.Millisecond * 100)
for {
select {
@@ -194,12 +195,12 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess
if !ok {
continue
}
- return tmp.(*goridge.SocketRelay), nil
+ return tmp.(*socket.Relay), nil
}
}
}
-func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) {
+func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) {
const op = errors.Op("find_relay")
// poll every 1ms for the relay
pollDone := time.NewTimer(f.tout)
@@ -212,13 +213,13 @@ func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error)
if !ok {
continue
}
- return tmp.(*goridge.SocketRelay), nil
+ return tmp.(*socket.Relay), nil
}
}
}
// chan to store relay associated with specific pid
-func (f *Factory) attachRelayToPid(pid int64, relay goridge.Relay) {
+func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) {
f.relays.Store(pid, relay)
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index daa07186..eacb8a8a 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -6,13 +6,13 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/frame"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
"go.uber.org/multierr"
-
- "github.com/spiral/goridge/v3"
)
type syncWorker struct {
@@ -126,10 +126,10 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (p
}
func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("exec payload")
+ const op = errors.Op("exec pl")
- frame := goridge.NewFrame()
- frame.WriteVersion(goridge.VERSION_1)
+ fr := frame.NewFrame()
+ fr.WriteVersion(frame.VERSION_1)
// can be 0 here
buf := new(bytes.Buffer)
@@ -137,28 +137,28 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
buf.Write(p.Body)
// Context offset
- frame.WriteOptions(uint32(len(p.Context)))
- frame.WritePayloadLen(uint32(buf.Len()))
- frame.WritePayload(buf.Bytes())
+ fr.WriteOptions(uint32(len(p.Context)))
+ fr.WritePayloadLen(uint32(buf.Len()))
+ fr.WritePayload(buf.Bytes())
- frame.WriteCRC()
+ fr.WriteCRC()
// empty and free the buffer
buf.Truncate(0)
- err := tw.Relay().Send(frame)
+ err := tw.Relay().Send(fr)
if err != nil {
return payload.Payload{}, err
}
- frameR := goridge.NewFrame()
+ frameR := frame.NewFrame()
err = tw.w.Relay().Receive(frameR)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
if frameR == nil {
- return payload.Payload{}, errors.E(op, errors.Str("nil frame received"))
+ return payload.Payload{}, errors.E(op, errors.Str("nil fr received"))
}
if !frameR.VerifyCRC() {
@@ -167,7 +167,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
flags := frameR.ReadFlags()
- if flags&byte(goridge.ERROR) != byte(0) {
+ if flags&byte(frame.ERROR) != byte(0) {
return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload())))
}
@@ -176,11 +176,11 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)"))
}
- payload := payload.Payload{}
- payload.Context = frameR.Payload()[:options[0]]
- payload.Body = frameR.Payload()[options[0]:]
+ pl := payload.Payload{}
+ pl.Context = frameR.Payload()[:options[0]]
+ pl.Body = frameR.Payload()[options[0]:]
- return payload, nil
+ return pl, nil
}
func (tw *syncWorker) String() string {
@@ -219,10 +219,10 @@ func (tw *syncWorker) Kill() error {
return tw.w.Kill()
}
-func (tw *syncWorker) Relay() goridge.Relay {
+func (tw *syncWorker) Relay() relay.Relay {
return tw.w.Relay()
}
-func (tw *syncWorker) AttachRelay(rl goridge.Relay) {
+func (tw *syncWorker) AttachRelay(rl relay.Relay) {
tw.w.AttachRelay(rl)
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 95fa6e06..ae59d611 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -13,7 +13,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3"
+ "github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
@@ -74,7 +74,7 @@ type Process struct {
mu sync.RWMutex
// communication bus with underlying process.
- relay goridge.Relay
+ relay relay.Relay
// rd in a second part of pipe to read from stderr
rd io.Reader
// stop signal terminates io.Pipe from reading from stderr
@@ -131,13 +131,13 @@ func (w *Process) State() internal.State {
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
-func (w *Process) AttachRelay(rl goridge.Relay) {
+func (w *Process) AttachRelay(rl relay.Relay) {
w.relay = rl
}
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
-func (w *Process) Relay() goridge.Relay {
+func (w *Process) Relay() relay.Relay {
return w.relay
}
diff --git a/plugins/checker/tests/plugin_test.go b/plugins/checker/tests/plugin_test.go
index 02a7f953..38b751ff 100644
--- a/plugins/checker/tests/plugin_test.go
+++ b/plugins/checker/tests/plugin_test.go
@@ -13,7 +13,7 @@ import (
"time"
"github.com/spiral/endure"
- "github.com/spiral/goridge/v3"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
"github.com/spiral/roadrunner/v2/interfaces/status"
"github.com/spiral/roadrunner/v2/plugins/checker"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -178,7 +178,7 @@ func TestStatusRPC(t *testing.T) {
func checkRPCStatus(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6005")
assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
st := &status.Status{}
diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go
index 1a61597c..d7818981 100644
--- a/plugins/http/tests/http_test.go
+++ b/plugins/http/tests/http_test.go
@@ -17,7 +17,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/spiral/endure"
- "github.com/spiral/goridge/v3"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
"github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/mocks"
@@ -193,7 +193,7 @@ func echoHTTP(t *testing.T) {
func resetTest(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
var ret bool
@@ -213,7 +213,7 @@ func resetTest(t *testing.T) {
func informerTest(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
list := struct {
// Workers is list of workers.
diff --git a/plugins/informer/tests/informer_test.go b/plugins/informer/tests/informer_test.go
index 193e84bb..dd06f1c4 100644
--- a/plugins/informer/tests/informer_test.go
+++ b/plugins/informer/tests/informer_test.go
@@ -10,7 +10,7 @@ import (
"time"
"github.com/spiral/endure"
- "github.com/spiral/goridge/v3"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
"github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
@@ -84,7 +84,7 @@ func TestInformerInit(t *testing.T) {
func informerRPCTest(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
list := struct {
// Workers is list of workers.
diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go
index 2d3a3c27..4572bc3f 100644
--- a/plugins/metrics/tests/metrics_test.go
+++ b/plugins/metrics/tests/metrics_test.go
@@ -12,7 +12,7 @@ import (
"time"
"github.com/spiral/endure"
- "github.com/spiral/goridge/v3"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/metrics"
@@ -298,7 +298,7 @@ func observeMetricNotEnoughLabels(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
@@ -331,7 +331,7 @@ func observeMetric(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
@@ -364,7 +364,7 @@ func counterMetric(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
@@ -398,7 +398,7 @@ func registerHistogram(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
@@ -434,7 +434,7 @@ func subVector(t *testing.T) {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
@@ -480,7 +480,7 @@ func subMetric(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
@@ -523,7 +523,7 @@ func setOnHistogram(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
@@ -558,7 +558,7 @@ func setWithoutLabels(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
@@ -593,7 +593,7 @@ func missingSection(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
@@ -629,7 +629,7 @@ func vectorMetric(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
@@ -665,7 +665,7 @@ func setMetric(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
@@ -698,7 +698,7 @@ func addMetricsTest(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
m := metrics.Metric{
@@ -718,7 +718,7 @@ func declareMetricsTest(t *testing.T) {
defer func() {
_ = conn.Close()
}()
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret bool
nc := metrics.NamedCollector{
diff --git a/plugins/resetter/tests/resetter_test.go b/plugins/resetter/tests/resetter_test.go
index 45de67e3..95c3a6b4 100644
--- a/plugins/resetter/tests/resetter_test.go
+++ b/plugins/resetter/tests/resetter_test.go
@@ -10,7 +10,7 @@ import (
"time"
"github.com/spiral/endure"
- "github.com/spiral/goridge/v3"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/resetter"
@@ -83,7 +83,7 @@ func TestResetterInit(t *testing.T) {
func resetterRPCTest(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
var ret bool
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
index c8e63496..98242ade 100755
--- a/plugins/rpc/plugin.go
+++ b/plugins/rpc/plugin.go
@@ -7,7 +7,7 @@ import (
"github.com/spiral/endure"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
"github.com/spiral/roadrunner/v2/interfaces/log"
rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -100,7 +100,7 @@ func (s *Plugin) Serve() chan error {
return
}
- go s.rpc.ServeCodec(goridge.NewCodec(conn))
+ go s.rpc.ServeCodec(goridgeRpc.NewCodec(conn))
}
}()
@@ -161,5 +161,5 @@ func (s *Plugin) Client() (*rpc.Client, error) {
return nil, err
}
- return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil
+ return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil
}
diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go
index 347e0330..411b9c54 100644
--- a/plugins/rpc/tests/plugin2.go
+++ b/plugins/rpc/tests/plugin2.go
@@ -6,7 +6,7 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
)
// plugin2 makes a call to the plugin1 via RPC
@@ -30,7 +30,7 @@ func (p2 *Plugin2) Serve() chan error {
errCh <- errors.E(errors.Serve, err)
return
}
- client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
var ret string
err = client.Call("rpc_test.plugin1.Hello", "Valery", &ret)
if err != nil {