diff options
-rw-r--r-- | .github/workflows/build.yml | 10 | ||||
-rwxr-xr-x | go.mod | 2 | ||||
-rwxr-xr-x | go.sum | 4 | ||||
-rw-r--r-- | interfaces/worker/worker.go | 6 | ||||
-rwxr-xr-x | internal/protocol.go | 37 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory.go | 6 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory_test.go | 2 | ||||
-rwxr-xr-x | pkg/socket/socket_factory.go | 15 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 38 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 8 | ||||
-rw-r--r-- | plugins/checker/tests/plugin_test.go | 4 | ||||
-rw-r--r-- | plugins/http/tests/http_test.go | 6 | ||||
-rw-r--r-- | plugins/informer/tests/informer_test.go | 4 | ||||
-rw-r--r-- | plugins/metrics/tests/metrics_test.go | 28 | ||||
-rw-r--r-- | plugins/reload/tests/reload_plugin_test.go | 249 | ||||
-rw-r--r-- | plugins/resetter/tests/resetter_test.go | 4 | ||||
-rwxr-xr-x | plugins/rpc/plugin.go | 6 | ||||
-rw-r--r-- | plugins/rpc/tests/plugin2.go | 4 |
18 files changed, 214 insertions, 219 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 46831554..d6644450 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -14,7 +14,7 @@ jobs: golang: name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}}) runs-on: ${{ matrix.os }} - timeout-minutes: 25 + timeout-minutes: 60 strategy: fail-fast: false matrix: @@ -37,12 +37,12 @@ jobs: uses: actions/checkout@v2 - name: Get Composer Cache Directory - if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest' }} id: composer-cache run: echo "::set-output name=dir::$(composer config cache-files-dir)" - name: Init Composer Cache # Docs: <https://git.io/JfAKn#php---composer> - if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest' }} uses: actions/cache@v2 with: path: ${{ steps.composer-cache.outputs.dir }} @@ -87,7 +87,7 @@ jobs: go test -v -race -cover -tags=debug ./plugins/checker/tests - name: Run golang tests on Linux and MacOS - if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest' }} run: | mkdir ./coverage-ci go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/pipe @@ -114,7 +114,7 @@ jobs: cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt - uses: codecov/codecov-action@v1 # Docs: <https://github.com/codecov/codecov-action> - if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest' }} with: token: ${{ secrets.CODECOV_TOKEN }} file: ./coverage-ci/summary.txt @@ -16,7 +16,7 @@ require ( github.com/spf13/viper v1.7.1 github.com/spiral/endure v1.0.0-beta20 github.com/spiral/errors v1.0.6 - github.com/spiral/goridge/v3 v3.0.0-beta7 + github.com/spiral/goridge/v3 v3.0.0-beta8 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a github.com/yookoala/gofast v0.4.0 @@ -382,8 +382,12 @@ github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM= github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.6 h1:berk5ShEILSw6DplUVv9Ea1wGdk2WlVKQpuvDngll0U= github.com/spiral/errors v1.0.6/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/goridge v1.0.4 h1:qnYtI84H0tcYjcbFdFl/VUFQZ0YUE9p+VuU8In4kC/8= +github.com/spiral/goridge v2.1.4+incompatible h1:L15TKrbPEp/G6JfS3jjuvY6whkhfD292XX+1iy9mO2k= github.com/spiral/goridge/v3 v3.0.0-beta7 h1:rJmfVFC/clN7XgsONcu185l36cPJ+MfcFkQSifQXFCM= github.com/spiral/goridge/v3 v3.0.0-beta7/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= +github.com/spiral/goridge/v3 v3.0.0-beta8 h1:x8uXCdhY49U1LEvmehnTaD2El6J9ZHAefRdh/QIZ6A4= +github.com/spiral/goridge/v3 v3.0.0-beta8/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= github.com/spiral/roadrunner v1.9.0 h1:hQRAqrpUCOujuuuY4dV5hQWjMhwvMnVZmK2mNON/yl4= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go index 773dd044..f830fdf2 100644 --- a/interfaces/worker/worker.go +++ b/interfaces/worker/worker.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" @@ -47,10 +47,10 @@ type BaseProcess interface { Kill() error // Relay returns attached to worker goridge relay - Relay() goridge.Relay + Relay() relay.Relay // AttachRelay used to attach goridge relay to the worker process - AttachRelay(rl goridge.Relay) + AttachRelay(rl relay.Relay) } type SyncWorker interface { diff --git a/internal/protocol.go b/internal/protocol.go index 5aa681eb..a099ce4d 100755 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -5,7 +5,8 @@ import ( j "github.com/json-iterator/go" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/frame" ) var json = j.ConfigCompatibleWithStandardLibrary @@ -18,39 +19,39 @@ type pidCommand struct { Pid int `json:"pid"` } -func SendControl(rl goridge.Relay, v interface{}) error { +func SendControl(rl relay.Relay, payload interface{}) error { const op = errors.Op("send control frame") - frame := goridge.NewFrame() - frame.WriteVersion(goridge.VERSION_1) - frame.WriteFlags(goridge.CONTROL) + fr := frame.NewFrame() + fr.WriteVersion(frame.VERSION_1) + fr.WriteFlags(frame.CONTROL) - if data, ok := v.([]byte); ok { + if data, ok := payload.([]byte); ok { // check if payload no more that 4Gb if uint32(len(data)) > ^uint32(0) { return errors.E(op, errors.Str("payload is more that 4gb")) } - frame.WritePayloadLen(uint32(len(data))) - frame.WritePayload(data) - frame.WriteCRC() + fr.WritePayloadLen(uint32(len(data))) + fr.WritePayload(data) + fr.WriteCRC() - err := rl.Send(frame) + err := rl.Send(fr) if err != nil { return errors.E(op, err) } return nil } - data, err := json.Marshal(v) + data, err := json.Marshal(payload) if err != nil { return errors.E(op, errors.Errorf("invalid payload: %s", err)) } - frame.WritePayloadLen(uint32(len(data))) - frame.WritePayload(data) - frame.WriteCRC() + fr.WritePayloadLen(uint32(len(data))) + fr.WritePayload(data) + fr.WriteCRC() - err = rl.Send(frame) + err = rl.Send(fr) if err != nil { return errors.E(op, err) } @@ -58,14 +59,14 @@ func SendControl(rl goridge.Relay, v interface{}) error { return nil } -func FetchPID(rl goridge.Relay) (int64, error) { +func FetchPID(rl relay.Relay) (int64, error) { const op = errors.Op("fetchPID") err := SendControl(rl, pidCommand{Pid: os.Getpid()}) if err != nil { return 0, errors.E(op, err) } - frameR := goridge.NewFrame() + frameR := frame.NewFrame() err = rl.Receive(frameR) if !frameR.VerifyCRC() { return 0, errors.E(op, errors.Str("CRC mismatch")) @@ -79,7 +80,7 @@ func FetchPID(rl goridge.Relay) (int64, error) { flags := frameR.ReadFlags() - if flags&(byte(goridge.CONTROL)) == 0 { + if flags&(byte(frame.CONTROL)) == 0 { return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag")) } diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go index c86d78c4..34735fe6 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -5,7 +5,7 @@ import ( "os/exec" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/pkg/pipe" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" @@ -65,7 +65,7 @@ func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (wo } // Init new PIPE relay - relay := goridge.NewPipeRelay(in, out) + relay := pipe.NewPipeRelay(in, out) w.AttachRelay(relay) // Start the worker @@ -134,7 +134,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { } // Init new PIPE relay - relay := goridge.NewPipeRelay(in, out) + relay := pipe.NewPipeRelay(in, out) w.AttachRelay(relay) // Start the worker diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 9453de1d..40797747 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -469,7 +469,7 @@ func Test_Error(t *testing.T) { if errors.Is(errors.ErrSoftJob, err) == false { t.Fatal("error should be of type errors.ErrSoftJob") } - assert.Contains(t, err.Error(), "exec payload: SoftJobError: hello") + assert.Contains(t, err.Error(), "hello") } func Test_NumExecs(t *testing.T) { diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go index f721ad66..b08d24e4 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -9,11 +9,12 @@ import ( "github.com/shirou/gopsutil/process" "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/socket" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/goridge/v3" "go.uber.org/multierr" "golang.org/x/sync/errgroup" ) @@ -65,7 +66,7 @@ func (f *Factory) listen() error { return err } - rl := goridge.NewSocketRelay(conn) + rl := socket.NewSocketRelay(conn) pid, err := internal.FetchPID(rl) if err != nil { return err @@ -178,7 +179,7 @@ func (f *Factory) Close(ctx context.Context) error { } // waits for Process to connect over socket and returns associated relay of timeout -func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*goridge.SocketRelay, error) { +func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { ticker := time.NewTicker(time.Millisecond * 100) for { select { @@ -194,12 +195,12 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess if !ok { continue } - return tmp.(*goridge.SocketRelay), nil + return tmp.(*socket.Relay), nil } } } -func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) { +func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { const op = errors.Op("find_relay") // poll every 1ms for the relay pollDone := time.NewTimer(f.tout) @@ -212,13 +213,13 @@ func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) if !ok { continue } - return tmp.(*goridge.SocketRelay), nil + return tmp.(*socket.Relay), nil } } } // chan to store relay associated with specific pid -func (f *Factory) attachRelayToPid(pid int64, relay goridge.Relay) { +func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { f.relays.Store(pid, relay) } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index daa07186..eacb8a8a 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -6,13 +6,13 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/frame" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" "go.uber.org/multierr" - - "github.com/spiral/goridge/v3" ) type syncWorker struct { @@ -126,10 +126,10 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (p } func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("exec payload") + const op = errors.Op("exec pl") - frame := goridge.NewFrame() - frame.WriteVersion(goridge.VERSION_1) + fr := frame.NewFrame() + fr.WriteVersion(frame.VERSION_1) // can be 0 here buf := new(bytes.Buffer) @@ -137,28 +137,28 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { buf.Write(p.Body) // Context offset - frame.WriteOptions(uint32(len(p.Context))) - frame.WritePayloadLen(uint32(buf.Len())) - frame.WritePayload(buf.Bytes()) + fr.WriteOptions(uint32(len(p.Context))) + fr.WritePayloadLen(uint32(buf.Len())) + fr.WritePayload(buf.Bytes()) - frame.WriteCRC() + fr.WriteCRC() // empty and free the buffer buf.Truncate(0) - err := tw.Relay().Send(frame) + err := tw.Relay().Send(fr) if err != nil { return payload.Payload{}, err } - frameR := goridge.NewFrame() + frameR := frame.NewFrame() err = tw.w.Relay().Receive(frameR) if err != nil { return payload.Payload{}, errors.E(op, err) } if frameR == nil { - return payload.Payload{}, errors.E(op, errors.Str("nil frame received")) + return payload.Payload{}, errors.E(op, errors.Str("nil fr received")) } if !frameR.VerifyCRC() { @@ -167,7 +167,7 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { flags := frameR.ReadFlags() - if flags&byte(goridge.ERROR) != byte(0) { + if flags&byte(frame.ERROR) != byte(0) { return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) } @@ -176,11 +176,11 @@ func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) } - payload := payload.Payload{} - payload.Context = frameR.Payload()[:options[0]] - payload.Body = frameR.Payload()[options[0]:] + pl := payload.Payload{} + pl.Context = frameR.Payload()[:options[0]] + pl.Body = frameR.Payload()[options[0]:] - return payload, nil + return pl, nil } func (tw *syncWorker) String() string { @@ -219,10 +219,10 @@ func (tw *syncWorker) Kill() error { return tw.w.Kill() } -func (tw *syncWorker) Relay() goridge.Relay { +func (tw *syncWorker) Relay() relay.Relay { return tw.w.Relay() } -func (tw *syncWorker) AttachRelay(rl goridge.Relay) { +func (tw *syncWorker) AttachRelay(rl relay.Relay) { tw.w.AttachRelay(rl) } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 95fa6e06..ae59d611 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -13,7 +13,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" @@ -74,7 +74,7 @@ type Process struct { mu sync.RWMutex // communication bus with underlying process. - relay goridge.Relay + relay relay.Relay // rd in a second part of pipe to read from stderr rd io.Reader // stop signal terminates io.Pipe from reading from stderr @@ -131,13 +131,13 @@ func (w *Process) State() internal.State { // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. -func (w *Process) AttachRelay(rl goridge.Relay) { +func (w *Process) AttachRelay(rl relay.Relay) { w.relay = rl } // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. -func (w *Process) Relay() goridge.Relay { +func (w *Process) Relay() relay.Relay { return w.relay } diff --git a/plugins/checker/tests/plugin_test.go b/plugins/checker/tests/plugin_test.go index 02a7f953..38b751ff 100644 --- a/plugins/checker/tests/plugin_test.go +++ b/plugins/checker/tests/plugin_test.go @@ -13,7 +13,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/interfaces/status" "github.com/spiral/roadrunner/v2/plugins/checker" "github.com/spiral/roadrunner/v2/plugins/config" @@ -178,7 +178,7 @@ func TestStatusRPC(t *testing.T) { func checkRPCStatus(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6005") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) st := &status.Status{} diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go index 1a61597c..d7818981 100644 --- a/plugins/http/tests/http_test.go +++ b/plugins/http/tests/http_test.go @@ -17,7 +17,7 @@ import ( "github.com/golang/mock/gomock" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/mocks" @@ -193,7 +193,7 @@ func echoHTTP(t *testing.T) { func resetTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. var ret bool @@ -213,7 +213,7 @@ func resetTest(t *testing.T) { func informerTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. list := struct { // Workers is list of workers. diff --git a/plugins/informer/tests/informer_test.go b/plugins/informer/tests/informer_test.go index 193e84bb..dd06f1c4 100644 --- a/plugins/informer/tests/informer_test.go +++ b/plugins/informer/tests/informer_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" @@ -84,7 +84,7 @@ func TestInformerInit(t *testing.T) { func informerRPCTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. list := struct { // Workers is list of workers. diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go index 2d3a3c27..4572bc3f 100644 --- a/plugins/metrics/tests/metrics_test.go +++ b/plugins/metrics/tests/metrics_test.go @@ -12,7 +12,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/metrics" @@ -298,7 +298,7 @@ func observeMetricNotEnoughLabels(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -331,7 +331,7 @@ func observeMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -364,7 +364,7 @@ func counterMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -398,7 +398,7 @@ func registerHistogram(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -434,7 +434,7 @@ func subVector(t *testing.T) { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -480,7 +480,7 @@ func subMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -523,7 +523,7 @@ func setOnHistogram(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -558,7 +558,7 @@ func setWithoutLabels(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -593,7 +593,7 @@ func missingSection(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -629,7 +629,7 @@ func vectorMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -665,7 +665,7 @@ func setMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -698,7 +698,7 @@ func addMetricsTest(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool m := metrics.Metric{ @@ -718,7 +718,7 @@ func declareMetricsTest(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ diff --git a/plugins/reload/tests/reload_plugin_test.go b/plugins/reload/tests/reload_plugin_test.go index d2fad28d..2ba9e256 100644 --- a/plugins/reload/tests/reload_plugin_test.go +++ b/plugins/reload/tests/reload_plugin_test.go @@ -4,7 +4,6 @@ import ( "io" "io/ioutil" "math/rand" - "net/http" "os" "os/signal" "path/filepath" @@ -14,11 +13,12 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/spiral/endure" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/mocks" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" - "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/reload" "github.com/spiral/roadrunner/v2/plugins/resetter" "github.com/spiral/roadrunner/v2/plugins/server" @@ -27,7 +27,8 @@ import ( const testDir string = "unit_tests" const testCopyToDir string = "unit_tests_copied" -const hugeNumberOfFiles uint = 5000 +const dir1 string = "dir1" +const hugeNumberOfFiles uint = 500 func TestReloadInit(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) @@ -43,13 +44,20 @@ func TestReloadInit(t *testing.T) { err = os.Mkdir(testDir, 0755) assert.NoError(t, err) - defer func() { - assert.NoError(t, freeResources(testDir)) - }() + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1").Times(1) + mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).Times(2) + mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").Times(1) + mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").Times(1) + mockLogger.EXPECT().Info("HTTP listeners successfully re-added").Times(1) + mockLogger.EXPECT().Info("HTTP plugin successfully restarted").Times(1) err = cont.RegisterAll( cfg, - &logger.ZapLogger{}, + mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, &reload.Plugin{}, @@ -58,9 +66,7 @@ func TestReloadInit(t *testing.T) { assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -101,9 +107,9 @@ func TestReloadInit(t *testing.T) { }() t.Run("ReloadTestInit", reloadTestInit) - reloadHTTPLiveAfterReset(t, "22388") wg.Wait() + assert.NoError(t, freeResources(testDir)) } func reloadTestInit(t *testing.T) { @@ -124,19 +130,26 @@ func TestReloadHugeNumberOfFiles(t *testing.T) { // try to remove, skip error assert.NoError(t, freeResources(testDir)) assert.NoError(t, freeResources(testCopyToDir)) - err = os.Mkdir(testDir, 0755) - assert.NoError(t, err) - err = os.Mkdir(testCopyToDir, 0755) - assert.NoError(t, err) - defer func() { - assert.NoError(t, freeResources(testDir)) - assert.NoError(t, freeResources(testCopyToDir)) - }() + assert.NoError(t, os.Mkdir(testDir, 0755)) + assert.NoError(t, os.Mkdir(testCopyToDir, 0755)) + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("file added to the list of removed files", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1").Times(1) + mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("file was updated", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").MinTimes(1) + mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").MinTimes(1) + mockLogger.EXPECT().Info("HTTP listeners successfully re-added").MinTimes(1) + mockLogger.EXPECT().Info("HTTP plugin successfully restarted").MinTimes(1) err = cont.RegisterAll( cfg, - &logger.ZapLogger{}, + mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, &reload.Plugin{}, @@ -145,9 +158,7 @@ func TestReloadHugeNumberOfFiles(t *testing.T) { assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -158,7 +169,7 @@ func TestReloadHugeNumberOfFiles(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - tt := time.NewTimer(time.Second * 100) + tt := time.NewTimer(time.Second * 60) go func() { defer wg.Done() @@ -188,22 +199,20 @@ func TestReloadHugeNumberOfFiles(t *testing.T) { }() t.Run("ReloadTestHugeNumberOfFiles", reloadHugeNumberOfFiles) - ttt := time.Now() t.Run("ReloadRandomlyChangeFile", randomlyChangeFile) - if time.Since(ttt).Seconds() > 80 { - t.Fatal("spend too much time on reloading") - } - reloadHTTPLiveAfterReset(t, "22388") wg.Wait() + + assert.NoError(t, freeResources(testDir)) + assert.NoError(t, freeResources(testCopyToDir)) } func randomlyChangeFile(t *testing.T) { - // we know, that directory contains 5000 files (0-4999) + // we know, that directory contains 500 files (0-499) // let's try to randomly change it - for i := 0; i < 100; i++ { + for i := 0; i < 10; i++ { // rand sleep - rSleep := rand.Int63n(1000) // nolint:gosec + rSleep := rand.Int63n(500) // nolint:gosec time.Sleep(time.Millisecond * time.Duration(rSleep)) rNum := rand.Int63n(int64(hugeNumberOfFiles)) // nolint:gosec err := ioutil.WriteFile(filepath.Join(testDir, "file_"+strconv.Itoa(int(rNum))+".txt"), []byte("Hello, Gophers!"), 0755) // nolint:gosec @@ -229,16 +238,23 @@ func TestReloadFilterFileExt(t *testing.T) { // try to remove, skip error assert.NoError(t, freeResources(testDir)) - err = os.Mkdir(testDir, 0755) - assert.NoError(t, err) + assert.NoError(t, os.Mkdir(testDir, 0755)) - defer func() { - assert.NoError(t, freeResources(testDir)) - }() + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1").Times(1) + mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(100) + mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("file added to the list of removed files", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").Times(1) + mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").Times(1) + mockLogger.EXPECT().Info("HTTP listeners successfully re-added").Times(1) + mockLogger.EXPECT().Info("HTTP plugin successfully restarted").Times(1) err = cont.RegisterAll( cfg, - &logger.ZapLogger{}, + mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, &reload.Plugin{}, @@ -247,9 +263,7 @@ func TestReloadFilterFileExt(t *testing.T) { assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -260,7 +274,7 @@ func TestReloadFilterFileExt(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - tt := time.NewTimer(time.Second * 40) + tt := time.NewTimer(time.Second * 60) go func() { defer wg.Done() @@ -290,15 +304,11 @@ func TestReloadFilterFileExt(t *testing.T) { }() t.Run("ReloadMakeFiles", reloadMakeFiles) - ttt := time.Now() t.Run("ReloadFilteredExt", reloadFilteredExt) - if time.Since(ttt).Seconds() > 20 { - t.Fatal("spend too much time on reloading") - } - - reloadHTTPLiveAfterReset(t, "27388") wg.Wait() + + assert.NoError(t, freeResources(testDir)) } func reloadMakeFiles(t *testing.T) { @@ -336,7 +346,7 @@ func reloadFilteredExt(t *testing.T) { } // Should be events only about creating files with txt ext -func TestReloadCopy3k(t *testing.T) { +func TestReloadCopy500(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) @@ -348,23 +358,29 @@ func TestReloadCopy3k(t *testing.T) { // try to remove, skip error assert.NoError(t, freeResources(testDir)) assert.NoError(t, freeResources(testCopyToDir)) - assert.NoError(t, freeResources("dir1")) - err = os.Mkdir(testDir, 0755) - assert.NoError(t, err) - err = os.Mkdir(testCopyToDir, 0755) - assert.NoError(t, err) - err = os.Mkdir("dir1", 0755) - assert.NoError(t, err) - - defer func() { - assert.NoError(t, freeResources(testDir)) - assert.NoError(t, freeResources(testCopyToDir)) - assert.NoError(t, freeResources("dir1")) - }() + assert.NoError(t, freeResources(dir1)) + + assert.NoError(t, os.Mkdir(testDir, 0755)) + assert.NoError(t, os.Mkdir(testCopyToDir, 0755)) + assert.NoError(t, os.Mkdir(dir1, 0755)) + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + // + mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1").Times(1) + mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(50) + mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(50) + mockLogger.EXPECT().Debug("file added to the list of removed files", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(50) + mockLogger.EXPECT().Debug("file was removed from watcher", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(50) + mockLogger.EXPECT().Debug("file was updated", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(50) + mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").MinTimes(1) + mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").MinTimes(1) + mockLogger.EXPECT().Info("HTTP listeners successfully re-added").MinTimes(1) + mockLogger.EXPECT().Info("HTTP plugin successfully restarted").MinTimes(1) err = cont.RegisterAll( cfg, - &logger.ZapLogger{}, + mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, &reload.Plugin{}, @@ -373,9 +389,7 @@ func TestReloadCopy3k(t *testing.T) { assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -386,7 +400,7 @@ func TestReloadCopy3k(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - tt := time.NewTimer(time.Second * 220) + tt := time.NewTimer(time.Second * 120) go func() { defer wg.Done() @@ -398,6 +412,7 @@ func TestReloadCopy3k(t *testing.T) { if err != nil { assert.FailNow(t, "error", err.Error()) } + return case <-sig: err = cont.Stop() if err != nil { @@ -426,19 +441,16 @@ func TestReloadCopy3k(t *testing.T) { // 3 // Recursive - t.Run("ReloadMake3kFiles", reloadMake3kFiles) - ttt := time.Now() + t.Run("ReloadMake300Files", reloadMake300Files) t.Run("ReloadCopyFiles", reloadCopyFiles) - if time.Since(ttt).Seconds() > 120 { - t.Fatal("spend too much time on copy") - } - t.Run("ReloadRecursiveDirsSupport", copyFilesRecursive) t.Run("RandomChangesInRecursiveDirs", randomChangesInRecursiveDirs) t.Run("RemoveFilesSupport", removeFilesSupport) t.Run("ReloadMoveSupport", reloadMoveSupport) - reloadHTTPLiveAfterReset(t, "37388") + assert.NoError(t, freeResources(testDir)) + assert.NoError(t, freeResources(testCopyToDir)) + assert.NoError(t, freeResources(dir1)) wg.Wait() } @@ -446,11 +458,11 @@ func TestReloadCopy3k(t *testing.T) { func reloadMoveSupport(t *testing.T) { t.Run("MoveSupportCopy", copyFilesRecursive) // move some files - for i := 0; i < 50; i++ { + for i := 0; i < 10; i++ { // rand sleep - rSleep := rand.Int63n(1000) // nolint:gosec + rSleep := rand.Int63n(500) // nolint:gosec time.Sleep(time.Millisecond * time.Duration(rSleep)) - rNum := rand.Int63n(int64(200)) // nolint:gosec + rNum := rand.Int63n(int64(100)) // nolint:gosec rDir := rand.Int63n(9) // nolint:gosec rExt := rand.Int63n(3) // nolint:gosec @@ -482,11 +494,11 @@ func reloadMoveSupport(t *testing.T) { func removeFilesSupport(t *testing.T) { // remove some files - for i := 0; i < 50; i++ { + for i := 0; i < 10; i++ { // rand sleep - rSleep := rand.Int63n(1000) // nolint:gosec + rSleep := rand.Int63n(500) // nolint:gosec time.Sleep(time.Millisecond * time.Duration(rSleep)) - rNum := rand.Int63n(int64(200)) // nolint:gosec + rNum := rand.Int63n(int64(100)) // nolint:gosec rDir := rand.Int63n(10) // nolint:gosec rExt := rand.Int63n(3) // nolint:gosec @@ -509,8 +521,8 @@ func removeFilesSupport(t *testing.T) { "dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9", "dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10", } - err := os.Remove(filepath.Join(dirs[rDir], "file_"+strconv.Itoa(int(rNum))+ext[rExt])) - assert.NoError(t, err) + // here can be a situation, when file already deleted + _ = os.Remove(filepath.Join(dirs[rDir], "file_"+strconv.Itoa(int(rNum))+ext[rExt])) } } @@ -540,11 +552,11 @@ func randomChangesInRecursiveDirs(t *testing.T) { "foo_", // should be created "bar_", // should be created } - for i := 0; i < 50; i++ { + for i := 0; i < 10; i++ { // rand sleep - rSleep := rand.Int63n(1000) // nolint:gosec + rSleep := rand.Int63n(500) // nolint:gosec time.Sleep(time.Millisecond * time.Duration(rSleep)) - rNum := rand.Int63n(int64(200)) // nolint:gosec + rNum := rand.Int63n(int64(100)) // nolint:gosec rDir := rand.Int63n(10) // nolint:gosec rExt := rand.Int63n(3) // nolint:gosec rName := rand.Int63n(3) // nolint:gosec @@ -583,19 +595,18 @@ func reloadCopyFiles(t *testing.T) { assert.NoError(t, freeResources(testDir)) assert.NoError(t, freeResources(testCopyToDir)) - err = os.Mkdir(testDir, 0755) - assert.NoError(t, err) - err = os.Mkdir(testCopyToDir, 0755) - assert.NoError(t, err) + + assert.NoError(t, os.Mkdir(testDir, 0755)) + assert.NoError(t, os.Mkdir(testCopyToDir, 0755)) // recreate files - for i := uint(0); i < 200; i++ { + for i := uint(0); i < 100; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".txt")) } - for i := uint(0); i < 200; i++ { + for i := uint(0); i < 100; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".abc")) } - for i := uint(0); i < 200; i++ { + for i := uint(0); i < 100; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".def")) } @@ -603,7 +614,7 @@ func reloadCopyFiles(t *testing.T) { assert.NoError(t, err) } -func reloadMake3kFiles(t *testing.T) { +func reloadMake300Files(t *testing.T) { for i := uint(0); i < 100; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".txt")) } @@ -627,25 +638,21 @@ func TestReloadNoRecursion(t *testing.T) { // try to remove, skip error assert.NoError(t, freeResources(testDir)) assert.NoError(t, freeResources(testCopyToDir)) - assert.NoError(t, freeResources("dir1")) - err = os.Mkdir(testDir, 0755) - assert.NoError(t, err) + assert.NoError(t, freeResources(dir1)) - err = os.Mkdir("dir1", 0755) - assert.NoError(t, err) + assert.NoError(t, os.Mkdir(testDir, 0755)) + assert.NoError(t, os.Mkdir(dir1, 0755)) + assert.NoError(t, os.Mkdir(testCopyToDir, 0755)) - err = os.Mkdir(testCopyToDir, 0755) - assert.NoError(t, err) + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) - defer func() { - assert.NoError(t, freeResources(testDir)) - assert.NoError(t, freeResources(testCopyToDir)) - assert.NoError(t, freeResources("dir1")) - }() + // http server should not be restarted. all event from wrong file extensions should be skipped + mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1").Times(1) err = cont.RegisterAll( cfg, - &logger.ZapLogger{}, + mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, &reload.Plugin{}, @@ -654,9 +661,7 @@ func TestReloadNoRecursion(t *testing.T) { assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -695,35 +700,19 @@ func TestReloadNoRecursion(t *testing.T) { } } }() + t.Run("ReloadMakeFiles", reloadMakeFiles) // make files in the testDir t.Run("ReloadCopyFilesRecursive", reloadCopyFiles) - reloadHTTPLiveAfterReset(t, "22766") - wg.Wait() + + assert.NoError(t, freeResources(testDir)) + assert.NoError(t, freeResources(testCopyToDir)) + assert.NoError(t, freeResources(dir1)) } // ======================================================================== -func reloadHTTPLiveAfterReset(t *testing.T, port string) { - req, err := http.NewRequest("GET", "http://localhost:"+port, nil) - assert.NoError(t, err) - - r, err := http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } - - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) - - assert.Equal(t, 200, r.StatusCode) - assert.Equal(t, "hello world", string(b)) - - err = r.Body.Close() - assert.NoError(t, err) -} - func freeResources(path string) error { return os.RemoveAll(path) } diff --git a/plugins/resetter/tests/resetter_test.go b/plugins/resetter/tests/resetter_test.go index 45de67e3..95c3a6b4 100644 --- a/plugins/resetter/tests/resetter_test.go +++ b/plugins/resetter/tests/resetter_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" @@ -83,7 +83,7 @@ func TestResetterInit(t *testing.T) { func resetterRPCTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. var ret bool diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index c8e63496..98242ade 100755 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -7,7 +7,7 @@ import ( "github.com/spiral/endure" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/interfaces/log" rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc" "github.com/spiral/roadrunner/v2/plugins/config" @@ -100,7 +100,7 @@ func (s *Plugin) Serve() chan error { return } - go s.rpc.ServeCodec(goridge.NewCodec(conn)) + go s.rpc.ServeCodec(goridgeRpc.NewCodec(conn)) } }() @@ -161,5 +161,5 @@ func (s *Plugin) Client() (*rpc.Client, error) { return nil, err } - return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil + return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil } diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go index 347e0330..411b9c54 100644 --- a/plugins/rpc/tests/plugin2.go +++ b/plugins/rpc/tests/plugin2.go @@ -6,7 +6,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" ) // plugin2 makes a call to the plugin1 via RPC @@ -30,7 +30,7 @@ func (p2 *Plugin2) Serve() chan error { errCh <- errors.E(errors.Serve, err) return } - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret string err = client.Call("rpc_test.plugin1.Hello", "Valery", &ret) if err != nil { |