diff options
author | Valery Piashchynski <[email protected]> | 2020-12-20 18:28:46 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-20 18:28:46 +0300 |
commit | f4a36c7f684216fb408693a6c494486144df57cf (patch) | |
tree | e1b61bf7e74cb63aa45f9ca0284a4cffe8e06b0e | |
parent | fbd5adde5abae6f7adb7fcdafc226bcd3480d498 (diff) | |
parent | a10d20d20e910ed8fcfbc3bc690aaf17ee338ff3 (diff) |
Merge remote-tracking branch 'origin/2.0' into plugin/redis
# Conflicts:
# go.sum
# pkg/pipe/pipe_factory_test.go
# pkg/pool/static_pool.go
# plugins/rpc/plugin.go
32 files changed, 355 insertions, 384 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d9cf2687..7c52c346 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -14,13 +14,13 @@ jobs: golang: name: Build (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}}) runs-on: ${{ matrix.os }} - timeout-minutes: 25 + timeout-minutes: 60 strategy: fail-fast: false matrix: php: [ '7.4', '8.0' ] go: [ '1.14', '1.15' ] - os: [ ubuntu-latest, windows-latest, macos-latest ] + os: [ ubuntu-20.04, windows-latest, macos-latest ] steps: - name: Set up Go ${{ matrix.go }} uses: actions/setup-go@v2 # action page: <https://github.com/actions/setup-go> @@ -37,12 +37,12 @@ jobs: uses: actions/checkout@v2 - name: Get Composer Cache Directory - if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest' }} id: composer-cache run: echo "::set-output name=dir::$(composer config cache-files-dir)" - name: Init Composer Cache # Docs: <https://git.io/JfAKn#php---composer> - if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest' }} uses: actions/cache@v2 with: path: ${{ steps.composer-cache.outputs.dir }} @@ -88,7 +88,7 @@ jobs: go test -v -race -cover -tags=debug ./plugins/checker/tests - name: Run golang tests on Linux and MacOS - if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest' }} run: | mkdir ./coverage-ci go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/util.txt -covermode=atomic ./util @@ -116,7 +116,7 @@ jobs: cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt - uses: codecov/codecov-action@v1 # Docs: <https://github.com/codecov/codecov-action> - if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macos-latest' }} + if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest' }} with: token: ${{ secrets.CODECOV_TOKEN }} file: ./coverage-ci/summary.txt @@ -17,7 +17,7 @@ require ( github.com/spf13/viper v1.7.1 github.com/spiral/endure v1.0.0-beta20 github.com/spiral/errors v1.0.6 - github.com/spiral/goridge/v3 v3.0.0-beta7 + github.com/spiral/goridge/v3 v3.0.0-beta8 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a github.com/yookoala/gofast v0.4.0 @@ -75,8 +75,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= @@ -94,8 +92,6 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= -github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= @@ -107,8 +103,6 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= -github.com/go-redis/redis/v8 v8.4.4 h1:fGqgxCTR1sydaKI00oQf3OmkU/DIe/I/fYXvGklCIuc= -github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY= github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt83WmbPeyVjCfNT9YDJ5BUFmcwFsEjI9SCvYM= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -149,8 +143,6 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= -github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -274,22 +266,13 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= -github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M= -github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.10.4 h1:NiTx7EEvBzu9sFOD1zORteLSt3o8gnlvZZwSE9TnY9U= -github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= @@ -399,8 +382,13 @@ github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM= github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.6 h1:berk5ShEILSw6DplUVv9Ea1wGdk2WlVKQpuvDngll0U= github.com/spiral/errors v1.0.6/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/goridge v1.0.4 h1:qnYtI84H0tcYjcbFdFl/VUFQZ0YUE9p+VuU8In4kC/8= +github.com/spiral/goridge v2.1.4+incompatible h1:L15TKrbPEp/G6JfS3jjuvY6whkhfD292XX+1iy9mO2k= github.com/spiral/goridge/v3 v3.0.0-beta7 h1:rJmfVFC/clN7XgsONcu185l36cPJ+MfcFkQSifQXFCM= github.com/spiral/goridge/v3 v3.0.0-beta7/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= +github.com/spiral/goridge/v3 v3.0.0-beta8 h1:x8uXCdhY49U1LEvmehnTaD2El6J9ZHAefRdh/QIZ6A4= +github.com/spiral/goridge/v3 v3.0.0-beta8/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE= +github.com/spiral/roadrunner v1.9.0 h1:hQRAqrpUCOujuuuY4dV5hQWjMhwvMnVZmK2mNON/yl4= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= @@ -436,8 +424,6 @@ go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.opentelemetry.io/otel v0.15.0 h1:CZFy2lPhxd4HlhZnYK8gRyDotksO3Ip9rBweY1vVYJw= -go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -502,7 +488,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= @@ -539,16 +524,12 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -651,7 +632,6 @@ gopkg.in/ini.v1 v1.38.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go index a1015fd6..72da9597 100644 --- a/interfaces/pool/pool.go +++ b/interfaces/pool/pool.go @@ -7,7 +7,7 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" ) // Pool managed set of inner worker processes. @@ -19,9 +19,9 @@ type Pool interface { GetConfig() interface{} // Exec - Exec(rqs internal.Payload) (internal.Payload, error) + Exec(rqs payload.Payload) (payload.Payload, error) - ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) + ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go index edbc68d9..f830fdf2 100644 --- a/interfaces/worker/worker.go +++ b/interfaces/worker/worker.go @@ -5,9 +5,10 @@ import ( "fmt" "time" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" ) // Allocator is responsible for worker allocation in the pool @@ -46,17 +47,17 @@ type BaseProcess interface { Kill() error // Relay returns attached to worker goridge relay - Relay() goridge.Relay + Relay() relay.Relay // AttachRelay used to attach goridge relay to the worker process - AttachRelay(rl goridge.Relay) + AttachRelay(rl relay.Relay) } type SyncWorker interface { // BaseProcess provides basic functionality for the SyncWorker BaseProcess // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(rqs internal.Payload) (internal.Payload, error) + Exec(rqs payload.Payload) (payload.Payload, error) // ExecWithContext used to handle Exec with TTL - ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) + ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) } diff --git a/internal/protocol.go b/internal/protocol.go index 5aa681eb..a099ce4d 100755 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -5,7 +5,8 @@ import ( j "github.com/json-iterator/go" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/frame" ) var json = j.ConfigCompatibleWithStandardLibrary @@ -18,39 +19,39 @@ type pidCommand struct { Pid int `json:"pid"` } -func SendControl(rl goridge.Relay, v interface{}) error { +func SendControl(rl relay.Relay, payload interface{}) error { const op = errors.Op("send control frame") - frame := goridge.NewFrame() - frame.WriteVersion(goridge.VERSION_1) - frame.WriteFlags(goridge.CONTROL) + fr := frame.NewFrame() + fr.WriteVersion(frame.VERSION_1) + fr.WriteFlags(frame.CONTROL) - if data, ok := v.([]byte); ok { + if data, ok := payload.([]byte); ok { // check if payload no more that 4Gb if uint32(len(data)) > ^uint32(0) { return errors.E(op, errors.Str("payload is more that 4gb")) } - frame.WritePayloadLen(uint32(len(data))) - frame.WritePayload(data) - frame.WriteCRC() + fr.WritePayloadLen(uint32(len(data))) + fr.WritePayload(data) + fr.WriteCRC() - err := rl.Send(frame) + err := rl.Send(fr) if err != nil { return errors.E(op, err) } return nil } - data, err := json.Marshal(v) + data, err := json.Marshal(payload) if err != nil { return errors.E(op, errors.Errorf("invalid payload: %s", err)) } - frame.WritePayloadLen(uint32(len(data))) - frame.WritePayload(data) - frame.WriteCRC() + fr.WritePayloadLen(uint32(len(data))) + fr.WritePayload(data) + fr.WriteCRC() - err = rl.Send(frame) + err = rl.Send(fr) if err != nil { return errors.E(op, err) } @@ -58,14 +59,14 @@ func SendControl(rl goridge.Relay, v interface{}) error { return nil } -func FetchPID(rl goridge.Relay) (int64, error) { +func FetchPID(rl relay.Relay) (int64, error) { const op = errors.Op("fetchPID") err := SendControl(rl, pidCommand{Pid: os.Getpid()}) if err != nil { return 0, errors.E(op, err) } - frameR := goridge.NewFrame() + frameR := frame.NewFrame() err = rl.Receive(frameR) if !frameR.VerifyCRC() { return 0, errors.E(op, errors.Str("CRC mismatch")) @@ -79,7 +80,7 @@ func FetchPID(rl goridge.Relay) (int64, error) { flags := frameR.ReadFlags() - if flags&(byte(goridge.CONTROL)) == 0 { + if flags&(byte(frame.CONTROL)) == 0 { return 0, errors.E(op, errors.Str("unexpected response, header is missing, no CONTROL flag")) } diff --git a/internal/payload.go b/pkg/payload/payload.go index 63983bad..fac36852 100755 --- a/internal/payload.go +++ b/pkg/payload/payload.go @@ -1,4 +1,4 @@ -package internal +package payload // Payload carries binary header and body to stack and // back to the server. diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go index c86d78c4..34735fe6 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -5,7 +5,7 @@ import ( "os/exec" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/pkg/pipe" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" @@ -65,7 +65,7 @@ func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (wo } // Init new PIPE relay - relay := goridge.NewPipeRelay(in, out) + relay := pipe.NewPipeRelay(in, out) w.AttachRelay(relay) // Start the worker @@ -134,7 +134,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { } // Init new PIPE relay - relay := goridge.NewPipeRelay(in, out) + relay := pipe.NewPipeRelay(in, out) w.AttachRelay(relay) // Start the worker diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 7045b785..40797747 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -11,7 +11,8 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/pkg/payload" + workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -146,12 +147,12 @@ func Test_Pipe_Echo(t *testing.T) { } }() - sw, err := worker.From(w) + sw, err := workerImpl.From(w) if err != nil { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -174,12 +175,12 @@ func Test_Pipe_Broken(t *testing.T) { assert.Error(t, err) }() - sw, err := worker.From(w) + sw, err := workerImpl.From(w) if err != nil { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -208,7 +209,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd) - sw, err := worker.From(w) + sw, err := workerImpl.From(w) if err != nil { b.Fatal(err) } @@ -228,7 +229,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -249,13 +250,13 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { } }() - sw, err := worker.From(w) + sw, err := workerImpl.From(w) if err != nil { b.Fatal(err) } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -276,13 +277,13 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { } }() - sw, err := worker.From(w) + sw, err := workerImpl.From(w) if err != nil { b.Fatal(err) } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -297,7 +298,7 @@ func Test_Echo(t *testing.T) { t.Fatal(err) } - syncWorker, err := worker.From(w) + syncWorker, err := workerImpl.From(w) if err != nil { t.Fatal(err) } @@ -311,7 +312,7 @@ func Test_Echo(t *testing.T) { } }() - res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -327,7 +328,7 @@ func Test_BadPayload(t *testing.T) { w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) - syncWorker, err := worker.From(w) + syncWorker, err := workerImpl.From(w) if err != nil { t.Fatal(err) } @@ -342,7 +343,7 @@ func Test_BadPayload(t *testing.T) { } }() - res, err := syncWorker.Exec(internal.Payload{}) + res, err := syncWorker.Exec(payload.Payload{}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -386,12 +387,12 @@ func Test_Echo_Slow(t *testing.T) { } }() - syncWorker, err := worker.From(w) + syncWorker, err := workerImpl.From(w) if err != nil { t.Fatal(err) } - res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -420,12 +421,12 @@ func Test_Broken(t *testing.T) { } }) - syncWorker, err := worker.From(w) + syncWorker, err := workerImpl.From(w) if err != nil { t.Fatal(err) } - res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -455,12 +456,12 @@ func Test_Error(t *testing.T) { } }() - syncWorker, err := worker.From(w) + syncWorker, err := workerImpl.From(w) if err != nil { t.Fatal(err) } - res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -468,7 +469,7 @@ func Test_Error(t *testing.T) { if errors.Is(errors.ErrSoftJob, err) == false { t.Fatal("error should be of type errors.ErrSoftJob") } - assert.Contains(t, err.Error(), "exec payload: SoftJobError: hello") + assert.Contains(t, err.Error(), "hello") } func Test_NumExecs(t *testing.T) { @@ -486,24 +487,24 @@ func Test_NumExecs(t *testing.T) { } }() - syncWorker, err := worker.From(w) + syncWorker, err := workerImpl.From(w) if err != nil { t.Fatal(err) } - _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(1), w.State().NumExecs()) - _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(2), w.State().NumExecs()) - _, err = syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 837fd183..9cf79fd4 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -9,7 +9,8 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - eventsHandler "github.com/spiral/roadrunner/v2/pkg/events" + eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" ) @@ -20,13 +21,13 @@ const StopRequest = "{\"stop\":true}" var bCtx = context.Background() // ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (internal.Payload, error) +type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) // Before is set of functions that executes BEFORE Exec -type Before func(req internal.Payload) internal.Payload +type Before func(req payload.Payload) payload.Payload // After is set of functions that executes AFTER Exec -type After func(req internal.Payload, resp internal.Payload) internal.Payload +type After func(req payload.Payload, resp payload.Payload) payload.Payload type Options func(p *StaticPool) @@ -71,7 +72,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory, cfg: cfg, cmd: cmd, factory: factory, - events: eventsHandler.NewEventsHandler(), + events: eventsPkg.NewEventsHandler(), after: make([]After, 0, 0), before: make([]Before, 0, 0), } @@ -79,7 +80,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory, p.allocator = newPoolAllocator(factory, cmd) p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) - workers, err := p.allocateSyncWorkers(ctx, p.cfg.NumWorkers) + workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) if err != nil { return nil, errors.E(op, err) } @@ -139,7 +140,7 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { return sp.ww.RemoveWorker(wb) } -func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { +func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("exec") if sp.cfg.Debug { return sp.execDebug(p) @@ -148,7 +149,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { defer cancel() w, err := sp.getWorker(ctxGetFree, op) if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } sw := w.(worker.SyncWorker) @@ -179,7 +180,7 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } } else { sp.ww.PushWorker(sw) @@ -194,13 +195,13 @@ func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { return rsp, nil } -func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) { +func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { const op = errors.Op("exec with context") ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() w, err := sp.getWorker(ctxGetFree, op) if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } sw := w.(worker.SyncWorker) @@ -231,7 +232,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } } else { sp.ww.PushWorker(sw) @@ -268,7 +269,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { } func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (internal.Payload, error) { + return func(err error, w worker.BaseProcess) (payload.Payload, error) { const op = errors.Op("error encoder") // soft job errors are allowed if errors.Is(errors.ErrSoftJob, err) { @@ -287,7 +288,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { sp.ww.PushWorker(w) } - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } w.State().Set(internal.StateInvalid) @@ -295,10 +296,10 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { errS := w.Stop(bCtx) if errS != nil { - return internal.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) + return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) } - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } } @@ -317,10 +318,10 @@ func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Alloc } } -func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) { +func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { sw, err := sp.allocator() if err != nil { - return internal.Payload{}, err + return payload.Payload{}, err } r, err := sw.(worker.SyncWorker).Exec(p) @@ -333,12 +334,11 @@ func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) { } // allocate required number of stack -func (sp *StaticPool) allocateSyncWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) { +func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) { const op = errors.Op("allocate workers") var workers []worker.BaseProcess // constant number of stack simplify logic - // TODO do not allocate context on every loop cycle?? for i := int64(0); i < numWorkers; i++ { ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) @@ -346,12 +346,7 @@ func (sp *StaticPool) allocateSyncWorkers(ctx context.Context, numWorkers int64) cancel() return nil, errors.E(op, errors.WorkerAllocate, err) } - sw, err := syncWorker.From(w) - if err != nil { - cancel() - return nil, errors.E(op, err) - } - workers = append(workers, sw) + workers = append(workers, w) cancel() } return workers, nil diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index dd33a1a6..b96e9214 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -14,6 +14,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pipe" "github.com/stretchr/testify/assert" ) @@ -80,7 +81,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -104,7 +105,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -128,7 +129,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -151,7 +152,7 @@ func Test_StaticPool_JobError(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -189,7 +190,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { } }) - res, err := p.ExecWithContext(ctx, internal.Payload{Body: []byte("hello")}) + res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) assert.Nil(t, res.Body) @@ -212,7 +213,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -287,11 +288,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.Exec(internal.Payload{Body: []byte("hello")}) + res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -323,14 +324,14 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { assert.Len(t, p.Workers(), 0) var lastPID string - res, _ := p.Exec(internal.Payload{Body: []byte("hello")}) + res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NotEqual(t, lastPID, string(res.Body)) assert.Len(t, p.Workers(), 0) for i := 0; i < 10; i++ { assert.Len(t, p.Workers(), 0) - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -363,14 +364,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -400,7 +401,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.Exec(internal.Payload{Body: []byte("100")}) + _, err = p.Exec(payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -422,7 +423,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, err := p.Exec(internal.Payload{Body: []byte("100")}) + _, err := p.Exec(payload.Payload{Body: []byte("100")}) if err != nil { t.Errorf("error executing payload: error %v", err) } @@ -430,7 +431,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 10) p.Destroy(ctx) - _, err = p.Exec(internal.Payload{Body: []byte("100")}) + _, err = p.Exec(payload.Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -456,7 +457,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { w.State().Set(internal.StateErrored) } - _, err = p.Exec(internal.Payload{Body: []byte("hello")}) + _, err = p.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) } @@ -494,7 +495,7 @@ func Benchmark_Pool_Echo(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -520,7 +521,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Add(1) go func() { defer wg.Done() - if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } @@ -549,7 +550,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 6d1f0c58..6faa609c 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -11,6 +11,7 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" ) const MB = 1024 * 1024 @@ -42,10 +43,10 @@ func newPoolWatcher(pool pool.Pool, events events.Handler, cfg *SupervisorConfig type ttlExec struct { err error - p internal.Payload + p payload.Payload } -func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) { +func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { const op = errors.Op("exec_supervised") if sp.cfg.ExecTTL == 0 { return sp.pool.Exec(rqs) @@ -59,7 +60,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) if err != nil { c <- ttlExec{ err: errors.E(op, err), - p: internal.Payload{}, + p: payload.Payload{}, } } @@ -72,10 +73,10 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) for { select { case <-ctx.Done(): - return internal.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) + return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) case res := <-c: if res.err != nil { - return internal.Payload{}, res.err + return payload.Payload{}, res.err } return res.p, nil @@ -83,11 +84,11 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) } } -func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) { +func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("supervised exec") rsp, err := sp.pool.Exec(p) if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } return rsp, nil } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 2e3e7fd2..7dd423b8 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -7,7 +7,7 @@ import ( "time" "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/pipe" "github.com/stretchr/testify/assert" ) @@ -61,7 +61,7 @@ func TestSupervisedPool_Exec(t *testing.T) { for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 50) - _, err = p.Exec(internal.Payload{ + _, err = p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -98,7 +98,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.ExecWithContext(context.Background(), internal.Payload{ + resp, err := p.ExecWithContext(context.Background(), payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -139,7 +139,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(internal.Payload{ + resp, err := p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go index f721ad66..b08d24e4 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -9,11 +9,12 @@ import ( "github.com/shirou/gopsutil/process" "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/socket" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/goridge/v3" "go.uber.org/multierr" "golang.org/x/sync/errgroup" ) @@ -65,7 +66,7 @@ func (f *Factory) listen() error { return err } - rl := goridge.NewSocketRelay(conn) + rl := socket.NewSocketRelay(conn) pid, err := internal.FetchPID(rl) if err != nil { return err @@ -178,7 +179,7 @@ func (f *Factory) Close(ctx context.Context) error { } // waits for Process to connect over socket and returns associated relay of timeout -func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*goridge.SocketRelay, error) { +func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { ticker := time.NewTicker(time.Millisecond * 100) for { select { @@ -194,12 +195,12 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess if !ok { continue } - return tmp.(*goridge.SocketRelay), nil + return tmp.(*socket.Relay), nil } } } -func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) { +func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { const op = errors.Op("find_relay") // poll every 1ms for the relay pollDone := time.NewTimer(f.tout) @@ -212,13 +213,13 @@ func (f *Factory) findRelay(w worker.BaseProcess) (*goridge.SocketRelay, error) if !ok { continue } - return tmp.(*goridge.SocketRelay), nil + return tmp.(*socket.Relay), nil } } } // chan to store relay associated with specific pid -func (f *Factory) attachRelayToPid(pid int64, relay goridge.Relay) { +func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { f.relays.Store(pid, relay) } diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go index f1a7d637..6a88713a 100755 --- a/pkg/socket/socket_factory_test.go +++ b/pkg/socket/socket_factory_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -211,7 +211,7 @@ func Test_Tcp_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -251,7 +251,7 @@ func Test_Tcp_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -396,7 +396,7 @@ func Test_Unix_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) @@ -439,7 +439,7 @@ func Test_Unix_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(internal.Payload{Body: []byte("hello")}) + res, err := sw.Exec(payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -515,7 +515,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -583,7 +583,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 1eb1396e..eacb8a8a 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -6,12 +6,13 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/goridge/v3/interfaces/relay" + "github.com/spiral/goridge/v3/pkg/frame" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "go.uber.org/multierr" - - "github.com/spiral/goridge/v3" ) type syncWorker struct { @@ -26,14 +27,14 @@ func From(w worker.BaseProcess) (worker.SyncWorker, error) { } // Exec payload without TTL timeout. -func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) { +func (tw *syncWorker) Exec(p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync worker Exec") if len(p.Body) == 0 && len(p.Context) == 0 { - return internal.Payload{}, errors.E(op, errors.Str("payload can not be empty")) + return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } if tw.w.State().Value() != internal.StateReady { - return internal.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) + return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())) } // set last used time @@ -47,7 +48,7 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) { tw.w.State().Set(internal.StateErrored) tw.w.State().RegisterExec() } - return internal.Payload{}, err + return payload.Payload{}, err } tw.w.State().Set(internal.StateReady) @@ -57,18 +58,18 @@ func (tw *syncWorker) Exec(p internal.Payload) (internal.Payload, error) { } type wexec struct { - payload internal.Payload + payload payload.Payload err error } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) (internal.Payload, error) { +func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("ExecWithContext") c := make(chan wexec, 1) go func() { if len(p.Body) == 0 && len(p.Context) == 0 { c <- wexec{ - payload: internal.Payload{}, + payload: payload.Payload{}, err: errors.E(op, errors.Str("payload can not be empty")), } return @@ -76,7 +77,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) ( if tw.w.State().Value() != internal.StateReady { c <- wexec{ - payload: internal.Payload{}, + payload: payload.Payload{}, err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.w.State().String())), } return @@ -94,7 +95,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) ( tw.w.State().RegisterExec() } c <- wexec{ - payload: internal.Payload{}, + payload: payload.Payload{}, err: errors.E(op, err), } return @@ -113,22 +114,22 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p internal.Payload) ( case <-ctx.Done(): err := multierr.Combine(tw.Kill()) if err != nil { - return internal.Payload{}, multierr.Append(err, ctx.Err()) + return payload.Payload{}, multierr.Append(err, ctx.Err()) } - return internal.Payload{}, ctx.Err() + return payload.Payload{}, ctx.Err() case res := <-c: if res.err != nil { - return internal.Payload{}, res.err + return payload.Payload{}, res.err } return res.payload, nil } } -func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) { - const op = errors.Op("exec payload") +func (tw *syncWorker) execPayload(p payload.Payload) (payload.Payload, error) { + const op = errors.Op("exec pl") - frame := goridge.NewFrame() - frame.WriteVersion(goridge.VERSION_1) + fr := frame.NewFrame() + fr.WriteVersion(frame.VERSION_1) // can be 0 here buf := new(bytes.Buffer) @@ -136,50 +137,50 @@ func (tw *syncWorker) execPayload(p internal.Payload) (internal.Payload, error) buf.Write(p.Body) // Context offset - frame.WriteOptions(uint32(len(p.Context))) - frame.WritePayloadLen(uint32(buf.Len())) - frame.WritePayload(buf.Bytes()) + fr.WriteOptions(uint32(len(p.Context))) + fr.WritePayloadLen(uint32(buf.Len())) + fr.WritePayload(buf.Bytes()) - frame.WriteCRC() + fr.WriteCRC() // empty and free the buffer buf.Truncate(0) - err := tw.Relay().Send(frame) + err := tw.Relay().Send(fr) if err != nil { - return internal.Payload{}, err + return payload.Payload{}, err } - frameR := goridge.NewFrame() + frameR := frame.NewFrame() err = tw.w.Relay().Receive(frameR) if err != nil { - return internal.Payload{}, errors.E(op, err) + return payload.Payload{}, errors.E(op, err) } if frameR == nil { - return internal.Payload{}, errors.E(op, errors.Str("nil frame received")) + return payload.Payload{}, errors.E(op, errors.Str("nil fr received")) } if !frameR.VerifyCRC() { - return internal.Payload{}, errors.E(op, errors.Str("failed to verify CRC")) + return payload.Payload{}, errors.E(op, errors.Str("failed to verify CRC")) } flags := frameR.ReadFlags() - if flags&byte(goridge.ERROR) != byte(0) { - return internal.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) + if flags&byte(frame.ERROR) != byte(0) { + return payload.Payload{}, errors.E(op, errors.ErrSoftJob, errors.Str(string(frameR.Payload()))) } options := frameR.ReadOptions() if len(options) != 1 { - return internal.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) + return payload.Payload{}, errors.E(op, errors.Str("options length should be equal 1 (body offset)")) } - payload := internal.Payload{} - payload.Context = frameR.Payload()[:options[0]] - payload.Body = frameR.Payload()[options[0]:] + pl := payload.Payload{} + pl.Context = frameR.Payload()[:options[0]] + pl.Body = frameR.Payload()[options[0]:] - return payload, nil + return pl, nil } func (tw *syncWorker) String() string { @@ -218,10 +219,10 @@ func (tw *syncWorker) Kill() error { return tw.w.Kill() } -func (tw *syncWorker) Relay() goridge.Relay { +func (tw *syncWorker) Relay() relay.Relay { return tw.w.Relay() } -func (tw *syncWorker) AttachRelay(rl goridge.Relay) { +func (tw *syncWorker) AttachRelay(rl relay.Relay) { tw.w.AttachRelay(rl) } diff --git a/pkg/worker/sync_worker_test.go b/pkg/worker/sync_worker_test.go index e224e105..40988b06 100755 --- a/pkg/worker/sync_worker_test.go +++ b/pkg/worker/sync_worker_test.go @@ -4,7 +4,7 @@ import ( "os/exec" "testing" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/stretchr/testify/assert" ) @@ -27,7 +27,7 @@ func Test_NotStarted_Exec(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.Exec(internal.Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 9a2e76b4..e60ab3f4 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -13,11 +13,11 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - events2 "github.com/spiral/roadrunner/v2/pkg/events" + eventsPkg "github.com/spiral/roadrunner/v2/pkg/events" "go.uber.org/multierr" ) @@ -67,7 +67,7 @@ type Process struct { mu sync.RWMutex // communication bus with underlying process. - relay goridge.Relay + relay relay.Relay // rd in a second part of pipe to read from stderr rd io.Reader // stop signal terminates io.Pipe from reading from stderr @@ -83,7 +83,7 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { } w := &Process{ created: time.Now(), - events: events2.NewEventsHandler(), + events: eventsPkg.NewEventsHandler(), cmd: cmd, state: internal.NewWorkerState(internal.StateInactive), stderr: new(bytes.Buffer), @@ -134,13 +134,13 @@ func (w *Process) State() internal.State { // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. -func (w *Process) AttachRelay(rl goridge.Relay) { +func (w *Process) AttachRelay(rl relay.Relay) { w.relay = rl } // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. -func (w *Process) Relay() goridge.Relay { +func (w *Process) Relay() relay.Relay { return w.relay } diff --git a/plugins/checker/tests/plugin_test.go b/plugins/checker/tests/plugin_test.go index 02a7f953..38b751ff 100644 --- a/plugins/checker/tests/plugin_test.go +++ b/plugins/checker/tests/plugin_test.go @@ -13,7 +13,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/interfaces/status" "github.com/spiral/roadrunner/v2/plugins/checker" "github.com/spiral/roadrunner/v2/plugins/config" @@ -178,7 +178,7 @@ func TestStatusRPC(t *testing.T) { func checkRPCStatus(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6005") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) st := &status.Status{} diff --git a/plugins/http/request.go b/plugins/http/request.go index 5df79b7d..d613bcf6 100644 --- a/plugins/http/request.go +++ b/plugins/http/request.go @@ -10,7 +10,7 @@ import ( j "github.com/json-iterator/go" "github.com/spiral/roadrunner/v2/interfaces/log" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/plugins/http/attributes" ) @@ -136,17 +136,17 @@ func (r *Request) Close(log log.Logger) { // Payload request marshaled RoadRunner payload based on PSR7 data. values encode method is JSON. Make sure to open // files prior to calling this method. -func (r *Request) Payload() (internal.Payload, error) { - p := internal.Payload{} +func (r *Request) Payload() (payload.Payload, error) { + p := payload.Payload{} var err error if p.Context, err = json.Marshal(r); err != nil { - return internal.Payload{}, err + return payload.Payload{}, err } if r.Parsed { if p.Body, err = json.Marshal(r.body); err != nil { - return internal.Payload{}, err + return payload.Payload{}, err } } else if r.body != nil { p.Body = r.body.([]byte) diff --git a/plugins/http/response.go b/plugins/http/response.go index 9700a16c..17049ce1 100644 --- a/plugins/http/response.go +++ b/plugins/http/response.go @@ -6,7 +6,7 @@ import ( "strings" "sync" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" ) // Response handles PSR7 response logic. @@ -23,7 +23,7 @@ type Response struct { } // NewResponse creates new response based on given pool payload. -func NewResponse(p internal.Payload) (*Response, error) { +func NewResponse(p payload.Payload) (*Response, error) { r := &Response{Body: p.Body} if err := json.Unmarshal(p.Context, r); err != nil { return nil, err diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go index 1a61597c..d7818981 100644 --- a/plugins/http/tests/http_test.go +++ b/plugins/http/tests/http_test.go @@ -17,7 +17,7 @@ import ( "github.com/golang/mock/gomock" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/mocks" @@ -193,7 +193,7 @@ func echoHTTP(t *testing.T) { func resetTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. var ret bool @@ -213,7 +213,7 @@ func resetTest(t *testing.T) { func informerTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. list := struct { // Workers is list of workers. diff --git a/plugins/http/tests/response_test.go b/plugins/http/tests/response_test.go index a526fe03..7901a0d1 100644 --- a/plugins/http/tests/response_test.go +++ b/plugins/http/tests/response_test.go @@ -6,7 +6,7 @@ import ( "net/http" "testing" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" http2 "github.com/spiral/roadrunner/v2/plugins/http" "github.com/stretchr/testify/assert" ) @@ -45,13 +45,13 @@ func (tw *testWriter) Push(target string, opts *http.PushOptions) error { } func TestNewResponse_Error(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{Context: []byte(`invalid payload`)}) + r, err := http2.NewResponse(payload.Payload{Context: []byte(`invalid payload`)}) assert.Error(t, err) assert.Nil(t, r) } func TestNewResponse_Write(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), Body: []byte(`sample body`), }) @@ -68,7 +68,7 @@ func TestNewResponse_Write(t *testing.T) { } func TestNewResponse_Stream(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -92,7 +92,7 @@ func TestNewResponse_Stream(t *testing.T) { } func TestNewResponse_StreamError(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"key":["value"]},"status": 301}`), }) @@ -112,7 +112,7 @@ func TestNewResponse_StreamError(t *testing.T) { } func TestWrite_HandlesPush(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"Http2-Push":["/test.js"],"content-type":["text/html"]},"status": 200}`), }) @@ -127,7 +127,7 @@ func TestWrite_HandlesPush(t *testing.T) { } func TestWrite_HandlesTrailers(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte(`{"headers":{"Trailer":["foo, bar", "baz"],"foo":["test"],"bar":["demo"]},"status": 200}`), }) @@ -146,7 +146,7 @@ func TestWrite_HandlesTrailers(t *testing.T) { } func TestWrite_HandlesHandlesWhitespacesInTrailer(t *testing.T) { - r, err := http2.NewResponse(internal.Payload{ + r, err := http2.NewResponse(payload.Payload{ Context: []byte( `{"headers":{"Trailer":["foo\t,bar , baz"],"foo":["a"],"bar":["b"],"baz":["c"]},"status": 200}`), }) diff --git a/plugins/informer/tests/informer_test.go b/plugins/informer/tests/informer_test.go index 193e84bb..dd06f1c4 100644 --- a/plugins/informer/tests/informer_test.go +++ b/plugins/informer/tests/informer_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" @@ -84,7 +84,7 @@ func TestInformerInit(t *testing.T) { func informerRPCTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. list := struct { // Workers is list of workers. diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go index 2d3a3c27..4572bc3f 100644 --- a/plugins/metrics/tests/metrics_test.go +++ b/plugins/metrics/tests/metrics_test.go @@ -12,7 +12,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/metrics" @@ -298,7 +298,7 @@ func observeMetricNotEnoughLabels(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -331,7 +331,7 @@ func observeMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -364,7 +364,7 @@ func counterMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -398,7 +398,7 @@ func registerHistogram(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -434,7 +434,7 @@ func subVector(t *testing.T) { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -480,7 +480,7 @@ func subMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -523,7 +523,7 @@ func setOnHistogram(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -558,7 +558,7 @@ func setWithoutLabels(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -593,7 +593,7 @@ func missingSection(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -629,7 +629,7 @@ func vectorMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -665,7 +665,7 @@ func setMetric(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ @@ -698,7 +698,7 @@ func addMetricsTest(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool m := metrics.Metric{ @@ -718,7 +718,7 @@ func declareMetricsTest(t *testing.T) { defer func() { _ = conn.Close() }() - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret bool nc := metrics.NamedCollector{ diff --git a/plugins/reload/tests/reload_plugin_test.go b/plugins/reload/tests/reload_plugin_test.go index d2fad28d..2ba9e256 100644 --- a/plugins/reload/tests/reload_plugin_test.go +++ b/plugins/reload/tests/reload_plugin_test.go @@ -4,7 +4,6 @@ import ( "io" "io/ioutil" "math/rand" - "net/http" "os" "os/signal" "path/filepath" @@ -14,11 +13,12 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/spiral/endure" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/mocks" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" - "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/reload" "github.com/spiral/roadrunner/v2/plugins/resetter" "github.com/spiral/roadrunner/v2/plugins/server" @@ -27,7 +27,8 @@ import ( const testDir string = "unit_tests" const testCopyToDir string = "unit_tests_copied" -const hugeNumberOfFiles uint = 5000 +const dir1 string = "dir1" +const hugeNumberOfFiles uint = 500 func TestReloadInit(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) @@ -43,13 +44,20 @@ func TestReloadInit(t *testing.T) { err = os.Mkdir(testDir, 0755) assert.NoError(t, err) - defer func() { - assert.NoError(t, freeResources(testDir)) - }() + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1").Times(1) + mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).Times(2) + mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").Times(1) + mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").Times(1) + mockLogger.EXPECT().Info("HTTP listeners successfully re-added").Times(1) + mockLogger.EXPECT().Info("HTTP plugin successfully restarted").Times(1) err = cont.RegisterAll( cfg, - &logger.ZapLogger{}, + mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, &reload.Plugin{}, @@ -58,9 +66,7 @@ func TestReloadInit(t *testing.T) { assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -101,9 +107,9 @@ func TestReloadInit(t *testing.T) { }() t.Run("ReloadTestInit", reloadTestInit) - reloadHTTPLiveAfterReset(t, "22388") wg.Wait() + assert.NoError(t, freeResources(testDir)) } func reloadTestInit(t *testing.T) { @@ -124,19 +130,26 @@ func TestReloadHugeNumberOfFiles(t *testing.T) { // try to remove, skip error assert.NoError(t, freeResources(testDir)) assert.NoError(t, freeResources(testCopyToDir)) - err = os.Mkdir(testDir, 0755) - assert.NoError(t, err) - err = os.Mkdir(testCopyToDir, 0755) - assert.NoError(t, err) - defer func() { - assert.NoError(t, freeResources(testDir)) - assert.NoError(t, freeResources(testCopyToDir)) - }() + assert.NoError(t, os.Mkdir(testDir, 0755)) + assert.NoError(t, os.Mkdir(testCopyToDir, 0755)) + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("file added to the list of removed files", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1").Times(1) + mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("file was updated", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").MinTimes(1) + mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").MinTimes(1) + mockLogger.EXPECT().Info("HTTP listeners successfully re-added").MinTimes(1) + mockLogger.EXPECT().Info("HTTP plugin successfully restarted").MinTimes(1) err = cont.RegisterAll( cfg, - &logger.ZapLogger{}, + mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, &reload.Plugin{}, @@ -145,9 +158,7 @@ func TestReloadHugeNumberOfFiles(t *testing.T) { assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -158,7 +169,7 @@ func TestReloadHugeNumberOfFiles(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - tt := time.NewTimer(time.Second * 100) + tt := time.NewTimer(time.Second * 60) go func() { defer wg.Done() @@ -188,22 +199,20 @@ func TestReloadHugeNumberOfFiles(t *testing.T) { }() t.Run("ReloadTestHugeNumberOfFiles", reloadHugeNumberOfFiles) - ttt := time.Now() t.Run("ReloadRandomlyChangeFile", randomlyChangeFile) - if time.Since(ttt).Seconds() > 80 { - t.Fatal("spend too much time on reloading") - } - reloadHTTPLiveAfterReset(t, "22388") wg.Wait() + + assert.NoError(t, freeResources(testDir)) + assert.NoError(t, freeResources(testCopyToDir)) } func randomlyChangeFile(t *testing.T) { - // we know, that directory contains 5000 files (0-4999) + // we know, that directory contains 500 files (0-499) // let's try to randomly change it - for i := 0; i < 100; i++ { + for i := 0; i < 10; i++ { // rand sleep - rSleep := rand.Int63n(1000) // nolint:gosec + rSleep := rand.Int63n(500) // nolint:gosec time.Sleep(time.Millisecond * time.Duration(rSleep)) rNum := rand.Int63n(int64(hugeNumberOfFiles)) // nolint:gosec err := ioutil.WriteFile(filepath.Join(testDir, "file_"+strconv.Itoa(int(rNum))+".txt"), []byte("Hello, Gophers!"), 0755) // nolint:gosec @@ -229,16 +238,23 @@ func TestReloadFilterFileExt(t *testing.T) { // try to remove, skip error assert.NoError(t, freeResources(testDir)) - err = os.Mkdir(testDir, 0755) - assert.NoError(t, err) + assert.NoError(t, os.Mkdir(testDir, 0755)) - defer func() { - assert.NoError(t, freeResources(testDir)) - }() + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1").Times(1) + mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(100) + mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Debug("file added to the list of removed files", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").Times(1) + mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").Times(1) + mockLogger.EXPECT().Info("HTTP listeners successfully re-added").Times(1) + mockLogger.EXPECT().Info("HTTP plugin successfully restarted").Times(1) err = cont.RegisterAll( cfg, - &logger.ZapLogger{}, + mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, &reload.Plugin{}, @@ -247,9 +263,7 @@ func TestReloadFilterFileExt(t *testing.T) { assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -260,7 +274,7 @@ func TestReloadFilterFileExt(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - tt := time.NewTimer(time.Second * 40) + tt := time.NewTimer(time.Second * 60) go func() { defer wg.Done() @@ -290,15 +304,11 @@ func TestReloadFilterFileExt(t *testing.T) { }() t.Run("ReloadMakeFiles", reloadMakeFiles) - ttt := time.Now() t.Run("ReloadFilteredExt", reloadFilteredExt) - if time.Since(ttt).Seconds() > 20 { - t.Fatal("spend too much time on reloading") - } - - reloadHTTPLiveAfterReset(t, "27388") wg.Wait() + + assert.NoError(t, freeResources(testDir)) } func reloadMakeFiles(t *testing.T) { @@ -336,7 +346,7 @@ func reloadFilteredExt(t *testing.T) { } // Should be events only about creating files with txt ext -func TestReloadCopy3k(t *testing.T) { +func TestReloadCopy500(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) @@ -348,23 +358,29 @@ func TestReloadCopy3k(t *testing.T) { // try to remove, skip error assert.NoError(t, freeResources(testDir)) assert.NoError(t, freeResources(testCopyToDir)) - assert.NoError(t, freeResources("dir1")) - err = os.Mkdir(testDir, 0755) - assert.NoError(t, err) - err = os.Mkdir(testCopyToDir, 0755) - assert.NoError(t, err) - err = os.Mkdir("dir1", 0755) - assert.NoError(t, err) - - defer func() { - assert.NoError(t, freeResources(testDir)) - assert.NoError(t, freeResources(testCopyToDir)) - assert.NoError(t, freeResources("dir1")) - }() + assert.NoError(t, freeResources(dir1)) + + assert.NoError(t, os.Mkdir(testDir, 0755)) + assert.NoError(t, os.Mkdir(testCopyToDir, 0755)) + assert.NoError(t, os.Mkdir(dir1, 0755)) + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + // + mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1").Times(1) + mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(50) + mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(50) + mockLogger.EXPECT().Debug("file added to the list of removed files", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(50) + mockLogger.EXPECT().Debug("file was removed from watcher", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(50) + mockLogger.EXPECT().Debug("file was updated", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(50) + mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").MinTimes(1) + mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").MinTimes(1) + mockLogger.EXPECT().Info("HTTP listeners successfully re-added").MinTimes(1) + mockLogger.EXPECT().Info("HTTP plugin successfully restarted").MinTimes(1) err = cont.RegisterAll( cfg, - &logger.ZapLogger{}, + mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, &reload.Plugin{}, @@ -373,9 +389,7 @@ func TestReloadCopy3k(t *testing.T) { assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -386,7 +400,7 @@ func TestReloadCopy3k(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - tt := time.NewTimer(time.Second * 220) + tt := time.NewTimer(time.Second * 120) go func() { defer wg.Done() @@ -398,6 +412,7 @@ func TestReloadCopy3k(t *testing.T) { if err != nil { assert.FailNow(t, "error", err.Error()) } + return case <-sig: err = cont.Stop() if err != nil { @@ -426,19 +441,16 @@ func TestReloadCopy3k(t *testing.T) { // 3 // Recursive - t.Run("ReloadMake3kFiles", reloadMake3kFiles) - ttt := time.Now() + t.Run("ReloadMake300Files", reloadMake300Files) t.Run("ReloadCopyFiles", reloadCopyFiles) - if time.Since(ttt).Seconds() > 120 { - t.Fatal("spend too much time on copy") - } - t.Run("ReloadRecursiveDirsSupport", copyFilesRecursive) t.Run("RandomChangesInRecursiveDirs", randomChangesInRecursiveDirs) t.Run("RemoveFilesSupport", removeFilesSupport) t.Run("ReloadMoveSupport", reloadMoveSupport) - reloadHTTPLiveAfterReset(t, "37388") + assert.NoError(t, freeResources(testDir)) + assert.NoError(t, freeResources(testCopyToDir)) + assert.NoError(t, freeResources(dir1)) wg.Wait() } @@ -446,11 +458,11 @@ func TestReloadCopy3k(t *testing.T) { func reloadMoveSupport(t *testing.T) { t.Run("MoveSupportCopy", copyFilesRecursive) // move some files - for i := 0; i < 50; i++ { + for i := 0; i < 10; i++ { // rand sleep - rSleep := rand.Int63n(1000) // nolint:gosec + rSleep := rand.Int63n(500) // nolint:gosec time.Sleep(time.Millisecond * time.Duration(rSleep)) - rNum := rand.Int63n(int64(200)) // nolint:gosec + rNum := rand.Int63n(int64(100)) // nolint:gosec rDir := rand.Int63n(9) // nolint:gosec rExt := rand.Int63n(3) // nolint:gosec @@ -482,11 +494,11 @@ func reloadMoveSupport(t *testing.T) { func removeFilesSupport(t *testing.T) { // remove some files - for i := 0; i < 50; i++ { + for i := 0; i < 10; i++ { // rand sleep - rSleep := rand.Int63n(1000) // nolint:gosec + rSleep := rand.Int63n(500) // nolint:gosec time.Sleep(time.Millisecond * time.Duration(rSleep)) - rNum := rand.Int63n(int64(200)) // nolint:gosec + rNum := rand.Int63n(int64(100)) // nolint:gosec rDir := rand.Int63n(10) // nolint:gosec rExt := rand.Int63n(3) // nolint:gosec @@ -509,8 +521,8 @@ func removeFilesSupport(t *testing.T) { "dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9", "dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10", } - err := os.Remove(filepath.Join(dirs[rDir], "file_"+strconv.Itoa(int(rNum))+ext[rExt])) - assert.NoError(t, err) + // here can be a situation, when file already deleted + _ = os.Remove(filepath.Join(dirs[rDir], "file_"+strconv.Itoa(int(rNum))+ext[rExt])) } } @@ -540,11 +552,11 @@ func randomChangesInRecursiveDirs(t *testing.T) { "foo_", // should be created "bar_", // should be created } - for i := 0; i < 50; i++ { + for i := 0; i < 10; i++ { // rand sleep - rSleep := rand.Int63n(1000) // nolint:gosec + rSleep := rand.Int63n(500) // nolint:gosec time.Sleep(time.Millisecond * time.Duration(rSleep)) - rNum := rand.Int63n(int64(200)) // nolint:gosec + rNum := rand.Int63n(int64(100)) // nolint:gosec rDir := rand.Int63n(10) // nolint:gosec rExt := rand.Int63n(3) // nolint:gosec rName := rand.Int63n(3) // nolint:gosec @@ -583,19 +595,18 @@ func reloadCopyFiles(t *testing.T) { assert.NoError(t, freeResources(testDir)) assert.NoError(t, freeResources(testCopyToDir)) - err = os.Mkdir(testDir, 0755) - assert.NoError(t, err) - err = os.Mkdir(testCopyToDir, 0755) - assert.NoError(t, err) + + assert.NoError(t, os.Mkdir(testDir, 0755)) + assert.NoError(t, os.Mkdir(testCopyToDir, 0755)) // recreate files - for i := uint(0); i < 200; i++ { + for i := uint(0); i < 100; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".txt")) } - for i := uint(0); i < 200; i++ { + for i := uint(0); i < 100; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".abc")) } - for i := uint(0); i < 200; i++ { + for i := uint(0); i < 100; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".def")) } @@ -603,7 +614,7 @@ func reloadCopyFiles(t *testing.T) { assert.NoError(t, err) } -func reloadMake3kFiles(t *testing.T) { +func reloadMake300Files(t *testing.T) { for i := uint(0); i < 100; i++ { assert.NoError(t, makeFile("file_"+strconv.Itoa(int(i))+".txt")) } @@ -627,25 +638,21 @@ func TestReloadNoRecursion(t *testing.T) { // try to remove, skip error assert.NoError(t, freeResources(testDir)) assert.NoError(t, freeResources(testCopyToDir)) - assert.NoError(t, freeResources("dir1")) - err = os.Mkdir(testDir, 0755) - assert.NoError(t, err) + assert.NoError(t, freeResources(dir1)) - err = os.Mkdir("dir1", 0755) - assert.NoError(t, err) + assert.NoError(t, os.Mkdir(testDir, 0755)) + assert.NoError(t, os.Mkdir(dir1, 0755)) + assert.NoError(t, os.Mkdir(testCopyToDir, 0755)) - err = os.Mkdir(testCopyToDir, 0755) - assert.NoError(t, err) + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) - defer func() { - assert.NoError(t, freeResources(testDir)) - assert.NoError(t, freeResources(testCopyToDir)) - assert.NoError(t, freeResources("dir1")) - }() + // http server should not be restarted. all event from wrong file extensions should be skipped + mockLogger.EXPECT().Debug("http handler response received", "elapsed", gomock.Any(), "remote address", "127.0.0.1").Times(1) err = cont.RegisterAll( cfg, - &logger.ZapLogger{}, + mockLogger, &server.Plugin{}, &httpPlugin.Plugin{}, &reload.Plugin{}, @@ -654,9 +661,7 @@ func TestReloadNoRecursion(t *testing.T) { assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -695,35 +700,19 @@ func TestReloadNoRecursion(t *testing.T) { } } }() + t.Run("ReloadMakeFiles", reloadMakeFiles) // make files in the testDir t.Run("ReloadCopyFilesRecursive", reloadCopyFiles) - reloadHTTPLiveAfterReset(t, "22766") - wg.Wait() + + assert.NoError(t, freeResources(testDir)) + assert.NoError(t, freeResources(testCopyToDir)) + assert.NoError(t, freeResources(dir1)) } // ======================================================================== -func reloadHTTPLiveAfterReset(t *testing.T, port string) { - req, err := http.NewRequest("GET", "http://localhost:"+port, nil) - assert.NoError(t, err) - - r, err := http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } - - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) - - assert.Equal(t, 200, r.StatusCode) - assert.Equal(t, "hello world", string(b)) - - err = r.Body.Close() - assert.NoError(t, err) -} - func freeResources(path string) error { return os.RemoveAll(path) } diff --git a/plugins/resetter/tests/resetter_test.go b/plugins/resetter/tests/resetter_test.go index 45de67e3..95c3a6b4 100644 --- a/plugins/resetter/tests/resetter_test.go +++ b/plugins/resetter/tests/resetter_test.go @@ -10,7 +10,7 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" @@ -83,7 +83,7 @@ func TestResetterInit(t *testing.T) { func resetterRPCTest(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. var ret bool diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index 8d308561..98242ade 100755 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -7,10 +7,10 @@ import ( "github.com/spiral/endure" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" - config2 "github.com/spiral/roadrunner/v2/interfaces/config" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/interfaces/log" rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc" + "github.com/spiral/roadrunner/v2/plugins/config" ) // PluginName contains default plugin name. @@ -32,7 +32,7 @@ type Plugin struct { } // Init rpc service. Must return true if service is enabled. -func (s *Plugin) Init(cfg config2.Configurer, log log.Logger) error { +func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error { const op = errors.Op("RPC Init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) @@ -100,7 +100,7 @@ func (s *Plugin) Serve() chan error { return } - go s.rpc.ServeCodec(goridge.NewCodec(conn)) + go s.rpc.ServeCodec(goridgeRpc.NewCodec(conn)) } }() @@ -161,5 +161,5 @@ func (s *Plugin) Client() (*rpc.Client, error) { return nil, err } - return rpc.NewClientWithCodec(goridge.NewClientCodec(conn)), nil + return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil } diff --git a/plugins/rpc/tests/plugin2.go b/plugins/rpc/tests/plugin2.go index 347e0330..411b9c54 100644 --- a/plugins/rpc/tests/plugin2.go +++ b/plugins/rpc/tests/plugin2.go @@ -6,7 +6,7 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/goridge/v3" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" ) // plugin2 makes a call to the plugin1 via RPC @@ -30,7 +30,7 @@ func (p2 *Plugin2) Serve() chan error { errCh <- errors.E(errors.Serve, err) return } - client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) var ret string err = client.Call("rpc_test.plugin1.Hello", "Valery", &ret) if err != nil { diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go index f613cf98..9d7812a8 100644 --- a/plugins/server/tests/plugin_pipes.go +++ b/plugins/server/tests/plugin_pipes.go @@ -8,7 +8,7 @@ import ( config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/worker" plugin "github.com/spiral/roadrunner/v2/plugins/server" @@ -47,7 +47,7 @@ func (f *Foo) Serve() chan error { const op = errors.Op("serve") // test payload for echo - r := internal.Payload{ + r := payload.Payload{ Context: nil, Body: []byte(Response), } diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go index 72b4d1a8..e5b139d4 100644 --- a/plugins/server/tests/plugin_sockets.go +++ b/plugins/server/tests/plugin_sockets.go @@ -7,7 +7,7 @@ import ( config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -31,7 +31,7 @@ func (f *Foo2) Serve() chan error { conf := &plugin.Config{} // test payload for echo - r := internal.Payload{ + r := payload.Payload{ Context: nil, Body: []byte(Response), } diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go index aa741deb..866116a7 100644 --- a/plugins/server/tests/plugin_tcp.go +++ b/plugins/server/tests/plugin_tcp.go @@ -7,7 +7,7 @@ import ( config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -31,7 +31,7 @@ func (f *Foo3) Serve() chan error { conf := &plugin.Config{} // test payload for echo - r := internal.Payload{ + r := payload.Payload{ Context: nil, Body: []byte(Response), } |