diff options
author | Valery Piashchynski <[email protected]> | 2020-12-20 17:42:56 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-20 17:42:56 +0300 |
commit | fe0109a3b19ee82bd7bfbb57eae6b5b5166d7068 (patch) | |
tree | 52fd2960cb440c99163738fcfeeb6df465662155 | |
parent | 349088db2081704e15699ce1e6f6b1f8898bd336 (diff) |
Latest goridge version support
-rwxr-xr-x | go.mod | 2 | ||||
-rwxr-xr-x | go.sum | 4 | ||||
-rw-r--r-- | interfaces/worker/worker.go | 6 | ||||
-rwxr-xr-x | internal/protocol.go | 37 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory.go | 6 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory_test.go | 2 | ||||
-rwxr-xr-x | pkg/socket/socket_factory.go | 15 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 38 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 8 | ||||
-rw-r--r-- | plugins/checker/tests/plugin_test.go | 4 | ||||
-rw-r--r-- | plugins/http/tests/http_test.go | 6 | ||||
-rw-r--r-- | plugins/informer/tests/informer_test.go | 4 | ||||
-rw-r--r-- | plugins/metrics/tests/metrics_test.go | 28 | ||||
-rw-r--r-- | plugins/resetter/tests/resetter_test.go | 4 | ||||
-rwxr-xr-x | plugins/rpc/plugin.go | 6 | ||||
-rw-r--r-- | plugins/rpc/tests/plugin2.go | 4 |
16 files changed, 90 insertions, 84 deletions
@@ -16,7 +16,7 @@ require ( github.com/spf13/viper v1.7.1 github.com/spiral/endure v1.0.0-beta20 github.com/spiral/errors v1.0.6 - github.com/spiral/goridge/v3 v3.0.0-beta7 + github.com/spiral/goridge/v3 v3.0.0-beta8 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a github.com/yookoala/gofast v0.4.0 @@ -382,8 +382,12 @@ github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM= github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.6 h1:berk5ShEILSw6DplUVv9Ea1wGdk2WlVKQpuvDngll0U= github.com/spiral/errors v1.0.6/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/goridge v1.0.4 h1:qnYtI84H0tcYjcbFdFl/VUFQZ0YUE9p+VuU8In4kC/8= +github.com/spiral/goridge v2.1.4+incompatible h1:L15TKrbPEp/G6JfS3jjuvY6whkhfD292XX+1iy9mO2k= github.com/spiral/goridge/v3 v3.0.0-beta7 h1:rJmfVFC/clN7XgsONcu185l36cPJ+MfcFkQSifQXFCM= github.com/spiral/goridge/v3 v3.0.0-beta7/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= +github.com/spiral/goridge/v3 v3.0.0-beta8 h1:x8uXCdhY49U1LEvmehnTaD2El6J9ZHAefRdh/QIZ6A4= +github.com/spiral/goridge/v3 v3.0.0-beta8/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= github.com/spiral/roadrunner v1.9.0 h1:hQRAqrpUCOujuuuY4dV5hQWjMhwvMnVZmK2mNON/yl4= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go index 773dd044..f830fdf2 100644 --- a/interfaces/worker/worker.go +++ b/interfaces/worker/worker.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" @@ -47,10 +47,10 @@ type BaseProcess interface { Kill() error // Relay returns attached to worker goridge relay - Relay() goridge.Relay + Relay() relay.Relay // AttachRelay used to attach goridge relay to the worker process - AttachRelay(rl goridge.Relay) + AttachRelay(rl relay.Relay) } type SyncWorker interface { diff --git a/internal/protocol.go b/internal/protocol.go index 5aa681eb..a099ce4d 100755 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -5,7 +5,8 @@ import ( j "github.com/json-iterator/go" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/frame" ) var json = j.ConfigCompatibleWithStandardLibrary @@ -18,39 +19,39 @@ type pidCommand struct { Pid int `json:"pid"` } -func SendControl(rl goridge.Relay, v interface{}) error { +func SendControl(rl relay.Relay, payload interface{}) error { const op = errors.Op("send control frame") - frame := goridge.NewFrame() - frame.WriteVersion(goridge.VERSION_1) - frame.WriteFlags(goridge.CONTROL) + fr := frame.NewFrame() + fr.WriteVersion(frame.VERSION_1) + fr.WriteFlags(frame.CONTROL) - if data, ok := v.([]byte); ok { + if data, ok := payload.([]byte); ok { // check if payload no more that 4Gb if uint32(len(data)) > ^uint32(0) { return errors.E(op, errors.Str("payload is more that 4gb")) } - frame.WritePayloadLen(uint32(len(data))) - frame.WritePayload(data) - frame.WriteCRC() + fr.WritePayloadLen(uint32(len(data))) + fr.WritePayload(data) + fr.WriteCRC() - err := rl.Send(frame) + err := rl.Send(fr) if err != nil { return errors.E(op, err) } return nil } - data, err := json.Marshal(v) + data, err := json.Marshal(payload) if err != nil { return errors.E(op, errors.Errorf("invalid payload: %s", err)) } - frame.WritePayloadLen(uint32(len(data))) - frame.WritePayload(data) - frame.WriteCRC() + fr.WritePayloadLen(uint32(len(data))) + fr.WritePayload(data) + fr.WriteCRC() - err = rl.Send(frame) + err = rl.Send(fr) if err != nil { return errors.E(op, err) } @@ -58,14 +59,14 @@ func SendControl(rl goridge.Relay, v interface{}) error { return nil } -func FetchPID(rl goridge.Relay) (int64, error) { +func FetchPID(rl relay.Relay) (int64, error) { const op = errors.Op("fetchPID") err := SendControl(rl, pidCommand{Pid: os.Getpid()}) if err != nil { return 0, errors.E(op, err) } - frameR := goridge.NewFrame() + frameR := frame.NewFrame() err = rl.Receive(frameR) if !frameR.VerifyCRC() { return 0, errors.E(op, errors.Str("CRC mismatch")) @@ -79,7 +80,7 @@ func FetchPID(rl goridge.Relay) (int64, error) { flags := frameR.ReadFlags() - if flags&(byte(goridge.CONTROL)) == 0 { + if flags&(byte(frame.CONTROL)) == 0 { return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag")) } diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go index c86d78c4..34735fe6 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -5,7 +5,7 @@ import ( "os/exec" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/pkg/pipe" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" @@ -65,7 +65,7 @@ func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (wo } // Init new PIPE relay - relay := goridge.NewPipeRelay(in, out) + relay := pipe.NewPipeRelay(in, out) w.AttachRelay(relay) // Start the worker @@ -134,7 +134,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { } // Init new PIPE relay - relay := goridge.NewPipeRelay(in, out) + relay := pipe.NewPipeRelay(in, out) w.AttachRelay(relay) // Start the worker diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 9453de1d..40797747 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -469,7 +469,7 @@ func Test_Error(t *testing.T) { if errors.Is(errors.ErrSoftJob, err) == false { t.Fatal("error should be of type errors.ErrSoftJob") } - assert.Contains(t, err.Error(), "exec payload: SoftJobError: hello") + assert.Contains(t, err.Error(), "hello") } func Test_NumExecs(t *testing.T) { diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go index f721ad66..b08d24e4 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -9,11 +9,12 @@ import ( "github.com/shirou/gopsutil/process" "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/socket" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/goridge/v3" "go.uber.org/multierr" "golang.org/x/sync/errgroup" ) @@ -65,7 +66,7 @@ func (f *Factory) listen() error { return err } - rl := goridge.NewSocketRelay(conn) + rl := socket.NewSocketRelay(conn) pid, err := internal.FetchPID(rl) if err != nil { return err @@ -178,7 +179,7 @@ func (f *Factory) Close(ctx context.Context) error { } // waits for Process to connect over socket and returns associated relay of timeout -func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*goridge.SocketRelay, error) { +func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { ticker := time.NewTicker(time.Millisecond * 100) for { select { @@ -194,12 +195,12 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess if !ok { continue } - return tmp.(*goridge.SocketRelay), nil + return tmp.(*socket.Relay), nil } } } -func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) { +func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { const op = errors.Op("find_relay") // poll every 1ms for the relay pollDone := time.NewTimer(f.tout) @@ -212,13 +213,13 @@ func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) if !ok { continue } - return tmp.(*goridge.SocketRelay), nil + return tmp.(*socket.Relay), nil } } } // chan to store relay associated with specific pid -func (f *Factory) attachRelayToPid(pid int64, relay goridge.Relay) { +func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { f.relays.Store(pid, relay) } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index daa07186..eacb8a8a 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -6,13 +6,13 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/frame" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" "go.uber.org/multierr" - - "github.com/spiral/goridge/v3" ) type syncWorker struct { @@ -126,10 +126,10 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (p } func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("exec payload") + const op = errors.Op("exec pl") - frame := goridge.NewFrame() - frame.WriteVersion(goridge.VERSION_1) + fr := frame.NewFrame() + fr.WriteVersion(frame.VERSION_1) // can be 0 here buf := new(bytes.Buffer) @@ -137,28 +137,28 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { buf.Write(p.Body) // Context offset - frame.WriteOptions(uint32(len(p.Context))) - frame.WritePayloadLen(uint32(buf.Len())) - frame.WritePayload(buf.Bytes()) + fr.WriteOptions(uint32(len(p.Context))) + fr.WritePayloadLen(uint32(buf.Len())) + fr.WritePayload(buf.Bytes()) - frame.WriteCRC() + fr.WriteCRC() // empty and free the buffer buf.Truncate(0) - err := tw.Relay().Send(frame) + err := tw.Relay().Send(fr) if err != nil { return payload.Payload{}, err } - frameR := goridge.NewFrame() + frameR := frame.NewFrame() err = tw.w.Relay().Receive(frameR) if err != nil { return payload.Payload{}, errors.E(op, err) } if frameR == nil { - return payload.Payload{}, errors.E(op, errors.Str("nil frame received")) + return payload.Payload{}, errors.E(op, errors.Str("nil fr received")) } if !frameR.VerifyCRC() { @@ -167,7 +167,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { flags := frameR.ReadFlags() - if flags&byte(goridge.ERROR) != byte(0) { + if flags&byte(frame.ERROR) != byte(0) { return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) } @@ -176,11 +176,11 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) } - payload := payload.Payload{} - payload.Context = frameR.Payload()[:options[0]] - payload.Body = frameR.Payload()[options[0]:] + pl := payload.Payload{} + pl.Context = frameR.Payload()[:options[0]] + pl.Body = frameR.Payload()[options[0]:] - return payload, nil + return pl, nil } func (tw *syncWorker) String() string { @@ -219,10 +219,10 @@ func (tw *syncWorker) Kill() error { return tw.w.Kill() } -func (tw *syncWorker) Relay() goridge.Relay { +func (tw *syncWorker) Relay() relay.Relay { return tw.w.Relay() } -func (tw *syncWorker) AttachRelay(rl goridge.Relay) { +func (tw *syncWorker) AttachRelay(rl relay.Relay) { tw.w.AttachRelay(rl) } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 95fa6e06..ae59d611 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -13,7 +13,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" @@ -74,7 +74,7 @@ type Process struct { mu sync.RWMutex // communication bus with underlying process. - relay goridge.Relay + relay relay.Relay // rd in a second part of pipe to read from stderr rd io.Reader // stop signal terminates io.Pipe from reading from stderr @@ -131,13 +131,13 @@ func (w *Process) State() internal.State { // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. -func (w *Process) AttachRelay(rl goridge.Relay) { +func (w *Process) AttachRelay(rl relay.Relay) { w.relay = rl } // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. -func (w *Process) Relay() goridge.Relay { +func (w *Process) Relay() relay.Relay { return w.relay } diff --git a/plugins/checker/tests/plugin_test.go b/plugins/checker/tests/plugin_test.go index 02a7f953..38b751ff 100644 --- a/plugins/checker/tests/plugin_test.go +++ b/plugins/checker/tests/plugin_test.go @@ -13,7 +13,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/interfaces/status" "github.com/spiral/roadrunner/v2/plugins/checker" "github.com/spiral/roadrunner/v2/plugins/config" @@ -178,7 +178,7 @@ func TestStatusRPC(t *testing.T) { func checkRPCStatus(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6005") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) st := &status.Status{} diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go index 1a61597c..d7818981 100644 --- a/plugins/http/tests/http_test.go +++ b/plugins/http/tests/http_test.go @@ -17,7 +17,7 @@ import ( "github.com/golang/mock/gomock" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/mocks" @@ -193,7 +193,7 @@ func echoHTTP(t *testing.T) { func resetTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. var ret bool @@ -213,7 +213,7 @@ func resetTest(t *testing.T) { func informerTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. list := struct { // Workers is list of workers. diff --git a/plugins/informer/tests/informer_test.go b/plugins/informer/tests/informer_test.go index 193e84bb..dd06f1c4 100644 --- a/plugins/informer/tests/informer_test.go +++ b/plugins/informer/tests/informer_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" @@ -84,7 +84,7 @@ func TestInformerInit(t *testing.T) { func informerRPCTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. list := struct { // Workers is list of workers. diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go index 2d3a3c27..4572bc3f 100644 --- a/plugins/metrics/tests/metrics_test.go +++ b/plugins/metrics/tests/metrics_test.go @@ -12,7 +12,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/metrics" @@ -298,7 +298,7 @@ func observeMetricNotEnoughLabels(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -331,7 +331,7 @@ func observeMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -364,7 +364,7 @@ func counterMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -398,7 +398,7 @@ func registerHistogram(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -434,7 +434,7 @@ func subVector(t *testing.T) { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -480,7 +480,7 @@ func subMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -523,7 +523,7 @@ func setOnHistogram(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -558,7 +558,7 @@ func setWithoutLabels(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -593,7 +593,7 @@ func missingSection(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -629,7 +629,7 @@ func vectorMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -665,7 +665,7 @@ func setMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -698,7 +698,7 @@ func addMetricsTest(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool m := metrics.Metric{ @@ -718,7 +718,7 @@ func declareMetricsTest(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ diff --git a/plugins/resetter/tests/resetter_test.go b/plugins/resetter/tests/resetter_test.go index 45de67e3..95c3a6b4 100644 --- a/plugins/resetter/tests/resetter_test.go +++ b/plugins/resetter/tests/resetter_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" @@ -83,7 +83,7 @@ func TestResetterInit(t *testing.T) { func resetterRPCTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. var ret bool diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index c8e63496..98242ade 100755 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -7,7 +7,7 @@ import ( "github.com/spiral/endure" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/interfaces/log" rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc" "github.com/spiral/roadrunner/v2/plugins/config" @@ -100,7 +100,7 @@ func (s *Plugin) Serve() chan error { return } - go s.rpc.ServeCodec(goridge.NewCodec(conn)) + go s.rpc.ServeCodec(goridgeRpc.NewCodec(conn)) } }() @@ -161,5 +161,5 @@ func (s *Plugin) Client() (*rpc.Client, error) { return nil, err } - return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil + return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil } diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go index 347e0330..411b9c54 100644 --- a/plugins/rpc/tests/plugin2.go +++ b/plugins/rpc/tests/plugin2.go @@ -6,7 +6,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" ) // plugin2 makes a call to the plugin1 via RPC @@ -30,7 +30,7 @@ func (p2 *Plugin2) Serve() chan error { errCh <- errors.E(errors.Serve, err) return } - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret string err = client.Call("rpc_test.plugin1.Hello", "Valery", &ret) if err != nil { |