diff options
author | Valery Piashchynski <[email protected]> | 2020-12-21 14:24:45 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-12-21 14:24:45 +0300 |
commit | 8543980775e5f8b12e5e200a0764052cdb4350a5 (patch) | |
tree | c1c6dff8e6bd81bcf51d608c5ed935702911ae81 | |
parent | fd6e9cc403fc0c3857dcf29768429a374bd85636 (diff) | |
parent | 7b32b6b93576ec72b4b7fdf2068e655f869e9cf8 (diff) |
Merge pull request #453 from spiral/plugin/redis
Plugin/redis
45 files changed, 607 insertions, 262 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d6644450..51c49af7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -65,6 +65,7 @@ jobs: - name: Run golang tests on Windows without codecov if: ${{ matrix.os == 'windows-latest' }} run: | + go test -v -race -cover -tags=debug ./util go test -v -race -cover -tags=debug ./pkg/pipe go test -v -race -cover -tags=debug ./pkg/pool go test -v -race -cover -tags=debug ./pkg/socket @@ -84,12 +85,14 @@ jobs: go test -v -race -cover -tags=debug ./plugins/static/tests go test -v -race -cover -tags=debug ./plugins/static go test -v -race -cover -tags=debug ./plugins/headers/tests + go test -v -race -cover -tags=debug ./plugins/redis/tests go test -v -race -cover -tags=debug ./plugins/checker/tests - name: Run golang tests on Linux and MacOS if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest' }} run: | mkdir ./coverage-ci + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/util.txt -covermode=atomic ./util go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/pipe go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/socket @@ -110,6 +113,7 @@ jobs: go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/static_root.txt -covermode=atomic ./plugins/static go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/headers.txt -covermode=atomic ./plugins/headers/tests go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/checker.txt -covermode=atomic ./plugins/checker/tests + go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/redis.txt -covermode=atomic ./plugins/redis/tests go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./plugins/reload/tests cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt @@ -15,7 +15,7 @@ http: debug: true address: 127.0.0.1:18903 maxRequestSize: 1024 - middleware: [ "" ] + middleware: [ "gzip", "headers" ] uploads: forbid: [ ".php", ".exe", ".bat" ] trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] @@ -36,4 +36,74 @@ http: http2: enabled: false h2c: false - maxConcurrentStreams: 128
\ No newline at end of file + maxConcurrentStreams: 128 + +redis: + # UniversalClient is an abstract client which - based on the provided options - + # can connect to either clusters, or sentinel-backed failover instances + # or simple single-instance servers. This can be useful for testing + # cluster-specific applications locally. + # if the number of addrs is 1 and master_name is empty, a single-node redis Client will be returned + + # if the number of Addrs is two or more, a ClusterClient will be returned + addrs: + - 'localhost:6379' + # if a MasterName is passed a sentinel-backed FailoverClient will be returned + master_name: '' + username: '' + password: '' + db: 0 + sentinel_password: '' + route_by_latency: false + route_randomly: false + dial_timeout: 0 # accepted values [1s, 5m, 3h] + max_retries: 1 + min_retry_backoff: 0 # accepted values [1s, 5m, 3h] + max_retry_backoff: 0 # accepted values [1s, 5m, 3h] + pool_size: 0 + min_idle_conns: 0 + max_conn_age: 0 # accepted values [1s, 5m, 3h] + read_timeout: 0 # accepted values [1s, 5m, 3h] + write_timeout: 0 # accepted values [1s, 5m, 3h] + pool_timeout: 0 # accepted values [1s, 5m, 3h] + idle_timeout: 0 # accepted values [1s, 5m, 3h] + idle_check_freq: 0 # accepted values [1s, 5m, 3h] + read_only: false + +metrics: + # prometheus client address (path /metrics added automatically) + address: localhost:2112 + collect: + app_metric: + type: histogram + help: "Custom application metric" + labels: [ "type" ] + buckets: [ 0.1, 0.2, 0.3, 1.0 ] + # objectives defines the quantile rank estimates with their respective + # absolute error [ for summary only ] + objectives: + - 1.4: 2.3 + - 2.0: 1.4 + +reload: + # sync interval + interval: 1s + # global patterns to sync + patterns: [ ".php" ] + # list of included for sync services + services: + http: + # recursive search for file patterns to add + recursive: true + # ignored folders + ignore: [ "vendor" ] + # service specific file pattens to sync + patterns: [ ".php", ".go",".md", ] + # directories to sync. If recursive is set to true, + # recursive sync will be applied only to the directories in `dirs` section + dirs: [ "." ] + rpc: + recursive: true + patterns: [ ".json" ] + # to include all project directories from workdir, leave `dirs` empty or add a dot "." + dirs: [ "" ]
\ No newline at end of file @@ -24,6 +24,7 @@ uninstall: ## Uninstall locally installed RR rm -f /usr/local/bin/rr test: ## Run application tests + go test -v -race -cover -tags=debug -covermode=atomic ./util go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pipe go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pool go test -v -race -cover -tags=debug -covermode=atomic ./pkg/socket @@ -43,6 +44,7 @@ test: ## Run application tests go test -v -race -cover -tags=debug -covermode=atomic ./plugins/static/tests go test -v -race -cover -tags=debug -covermode=atomic ./plugins/headers/tests go test -v -race -cover -tags=debug -covermode=atomic ./plugins/checker/tests + go test -v -race -cover -tags=debug -covermode=atomic ./plugins/redis/tests go test -v -race -cover -tags=debug -covermode=atomic ./plugins/reload/tests lint: ## Run application linters @@ -5,8 +5,10 @@ go 1.15 require ( github.com/NYTimes/gziphandler v1.1.1 github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect + github.com/alicebob/miniredis/v2 v2.14.1 github.com/fatih/color v1.10.0 github.com/go-ole/go-ole v1.2.4 // indirect + github.com/go-redis/redis/v8 v8.4.4 github.com/gofiber/fiber/v2 v2.3.0 github.com/golang/mock v1.4.4 github.com/hashicorp/go-multierror v1.1.0 @@ -29,6 +29,11 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI= +github.com/alicebob/miniredis/v2 v2.14.1 h1:GjlbSeoJ24bzdLRs13HoMEeaRZx9kg5nHoRW7QV/nCs= +github.com/alicebob/miniredis/v2 v2.14.1/go.mod h1:uS970Sw5Gs9/iK3yBg0l9Uj9s25wXxSpQUE9EaJ/Blg= github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDafo4= github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -57,6 +62,9 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= @@ -75,6 +83,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= @@ -92,6 +102,8 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= @@ -103,6 +115,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= +github.com/go-redis/redis/v8 v8.4.4 h1:fGqgxCTR1sydaKI00oQf3OmkU/DIe/I/fYXvGklCIuc= +github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY= github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt83WmbPeyVjCfNT9YDJ5BUFmcwFsEjI9SCvYM= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -143,6 +157,8 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -266,13 +282,22 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M= +github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.4 h1:NiTx7EEvBzu9sFOD1zORteLSt3o8gnlvZZwSE9TnY9U= +github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= @@ -411,6 +436,8 @@ github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6Ac github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yookoala/gofast v0.4.0 h1:dLBjghcsbbZNOEHN8N1X/gh9S6srmJed4WQfG7DlKwo= github.com/yookoala/gofast v0.4.0/go.mod h1:rfbkoKaQG1bnuTUZcmV3vAlnfpF4FTq8WbQJf2vcpg8= +github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0= +github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= @@ -419,6 +446,8 @@ go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/otel v0.15.0 h1:CZFy2lPhxd4HlhZnYK8gRyDotksO3Ip9rBweY1vVYJw= +go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -483,8 +512,10 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201216054612-986b41b23924 h1:QsnDpLLOKwHBBDa8nDws4DYNc/ryVW2vCpxCs09d4PY= golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -508,6 +539,7 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -519,12 +551,16 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -629,6 +665,7 @@ gopkg.in/ini.v1 v1.38.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/plugins/config/configurer.go b/interfaces/config/interface.go index 00010eae..2a7c67ce 100755 --- a/plugins/config/configurer.go +++ b/interfaces/config/interface.go @@ -14,6 +14,9 @@ type Configurer interface { // Get used to get config section Get(name string) interface{} + // Overwrite used to overwrite particular values in the unmarshalled config + Overwrite(values map[string]interface{}) error + // Has checks if config section exists. Has(name string) bool } diff --git a/interfaces/factory/factory.go b/interfaces/factory/factory.go deleted file mode 100755 index 51b73501..00000000 --- a/interfaces/factory/factory.go +++ /dev/null @@ -1,22 +0,0 @@ -package worker - -import ( - "context" - "os/exec" - - "github.com/spiral/roadrunner/v2/interfaces/worker" -) - -// Factory is responsible of wrapping given command into tasks WorkerProcess. -type Factory interface { - // SpawnWorkerWithContext creates new WorkerProcess process based on given command with contex. - // Process must not be started. - SpawnWorkerWithContext(context.Context, *exec.Cmd) (worker.BaseProcess, error) - - // SpawnWorker creates new WorkerProcess process based on given command. - // Process must not be started. - SpawnWorker(*exec.Cmd) (worker.BaseProcess, error) - - // Close the factory and underlying connections. - Close(ctx context.Context) error -} diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go index 72da9597..22552388 100644 --- a/interfaces/pool/pool.go +++ b/interfaces/pool/pool.go @@ -18,9 +18,10 @@ type Pool interface { // GetConfig returns pool configuration. GetConfig() interface{} - // Exec + // Exec executes task with payload Exec(rqs payload.Payload) (payload.Payload, error) + // ExecWithContext executes task with context which is used with timeout ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) // Workers returns worker list associated with the pool. diff --git a/interfaces/redis/interface.go b/interfaces/redis/interface.go new file mode 100644 index 00000000..909c8ca4 --- /dev/null +++ b/interfaces/redis/interface.go @@ -0,0 +1,9 @@ +package redis + +import "github.com/go-redis/redis/v8" + +// Redis in the redis KV plugin interface +type Redis interface { + // GetClient + GetClient() redis.UniversalClient +} diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go index 19e2bf5d..8db8ddcc 100644 --- a/interfaces/worker/factory.go +++ b/interfaces/worker/factory.go @@ -9,10 +9,10 @@ import ( type Factory interface { // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. // Process must not be started. - SpawnWorkerWithContext(context.Context, *exec.Cmd) (BaseProcess, error) + SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (BaseProcess, error) // SpawnWorker creates new WorkerProcess process based on given command. // Process must not be started. SpawnWorker(*exec.Cmd) (BaseProcess, error) // Close the factory and underlying connections. - Close(ctx context.Context) error + Close() error } diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go index f830fdf2..7f2f8a53 100644 --- a/interfaces/worker/worker.go +++ b/interfaces/worker/worker.go @@ -40,7 +40,7 @@ type BaseProcess interface { Wait() error // Stop sends soft termination command to the WorkerProcess and waits for process completion. - Stop(ctx context.Context) error + Stop() error // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! @@ -59,5 +59,5 @@ type SyncWorker interface { // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS Exec(rqs payload.Payload) (payload.Payload, error) // ExecWithContext used to handle Exec with TTL - ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) + ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) } diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go index 34735fe6..a0e0c258 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -31,7 +31,7 @@ type SpawnResult struct { // SpawnWorker creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { c := make(chan SpawnResult) const op = errors.Op("spawn worker with context") go func() { @@ -159,6 +159,6 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { } // Close the factory. -func (f *Factory) Close(ctx context.Context) error { +func (f *Factory) Close() error { return nil } diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 40797747..0d548b7a 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -20,7 +20,7 @@ func Test_GetState(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) assert.Equal(t, internal.StateStopped, w.State().Value()) @@ -30,7 +30,7 @@ func Test_GetState(t *testing.T) { assert.NotNil(t, w) assert.Equal(t, internal.StateReady, w.State().Value()) - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -40,7 +40,7 @@ func Test_Kill(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) wg := &sync.WaitGroup{} wg.Add(1) go func() { @@ -65,7 +65,7 @@ func Test_Pipe_Start(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -73,7 +73,7 @@ func Test_Pipe_Start(t *testing.T) { assert.NoError(t, w.Wait()) }() - assert.NoError(t, w.Stop(ctx)) + assert.NoError(t, w.Stop()) } func Test_Pipe_StartError(t *testing.T) { @@ -84,7 +84,7 @@ func Test_Pipe_StartError(t *testing.T) { } ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -97,7 +97,7 @@ func Test_Pipe_PipeError(t *testing.T) { } ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -110,7 +110,7 @@ func Test_Pipe_PipeError2(t *testing.T) { } ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -118,7 +118,7 @@ func Test_Pipe_PipeError2(t *testing.T) { func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) @@ -128,7 +128,7 @@ func Test_Pipe_Failboot(t *testing.T) { func Test_Pipe_Invalid(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -136,12 +136,12 @@ func Test_Pipe_Invalid(t *testing.T) { func Test_Pipe_Echo(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -165,13 +165,13 @@ func Test_Pipe_Echo(t *testing.T) { func Test_Pipe_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } defer func() { time.Sleep(time.Second) - err = w.Stop(ctx) + err = w.Stop() assert.Error(t, err) }() @@ -191,14 +191,14 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { f := NewPipeFactory() for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := f.SpawnWorkerWithContext(context.Background(), cmd) + w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) go func() { if w.Wait() != nil { b.Fail() } }() - err := w.Stop(context.Background()) + err := w.Stop() if err != nil { b.Errorf("error stopping the worker: error %v", err) } @@ -208,7 +208,7 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) sw, err := workerImpl.From(w) if err != nil { b.Fatal(err) @@ -222,7 +222,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { } }() defer func() { - err := w.Stop(context.Background()) + err := w.Stop() if err != nil { b.Errorf("error stopping the worker: error %v", err) } @@ -238,13 +238,13 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } @@ -265,13 +265,13 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } @@ -293,7 +293,7 @@ func Test_Echo(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -306,7 +306,7 @@ func Test_Echo(t *testing.T) { assert.NoError(t, syncWorker.Wait()) }() defer func() { - err := syncWorker.Stop(ctx) + err := syncWorker.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -326,7 +326,7 @@ func Test_BadPayload(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) syncWorker, err := workerImpl.From(w) if err != nil { @@ -337,7 +337,7 @@ func Test_BadPayload(t *testing.T) { assert.NoError(t, syncWorker.Wait()) }() defer func() { - err := syncWorker.Stop(ctx) + err := syncWorker.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -356,12 +356,12 @@ func Test_String(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() defer func() { - err := w.Stop(ctx) + err := w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -376,12 +376,12 @@ func Test_Echo_Slow(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() defer func() { - err := w.Stop(ctx) + err := w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -406,7 +406,7 @@ func Test_Broken(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -437,20 +437,20 @@ func Test_Broken(t *testing.T) { t.Fail() } mu.Unlock() - assert.Error(t, w.Stop(ctx)) + assert.Error(t, w.Stop()) } func Test_Error(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() defer func() { - err := w.Stop(ctx) + err := w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -476,12 +476,12 @@ func Test_NumExecs(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() defer func() { - err := w.Stop(ctx) + err := w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 9cf79fd4..2a06b255 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -3,6 +3,7 @@ package pool import ( "context" "os/exec" + "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/interfaces/events" @@ -18,8 +19,6 @@ import ( // StopRequest can be sent by worker to indicate that restart is required. const StopRequest = "{\"stop\":true}" -var bCtx = context.Background() - // ErrorEncoder encode error or make a decision based on the error type type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) @@ -77,10 +76,10 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory, before: make([]Before, 0, 0), } - p.allocator = newPoolAllocator(factory, cmd) + p.allocator = newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) - workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) + workers, err := p.allocateWorkers(p.cfg.NumWorkers) if err != nil { return nil, errors.E(op, err) } @@ -169,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { // TODO careful with string(rsp.Context) if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { sw.State().Set(internal.StateInvalid) - err = sw.Stop(bCtx) + err = sw.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } @@ -204,8 +203,6 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) return payload.Payload{}, errors.E(op, err) } - sw := w.(worker.SyncWorker) - // apply all before function if len(sp.before) > 0 { for i := 0; i < len(sp.before); i++ { @@ -213,29 +210,29 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) } } - rsp, err := sw.ExecWithContext(ctx, rqs) + rsp, err := w.ExecWithTimeout(ctx, rqs) if err != nil { - return sp.errEncoder(err, sw) + return sp.errEncoder(err, w) } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - sw.State().Set(internal.StateInvalid) - err = sw.Stop(bCtx) + w.State().Set(internal.StateInvalid) + err = w.Stop() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } return sp.Exec(rqs) } - if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { return payload.Payload{}, errors.E(op, err) } } else { - sp.ww.PushWorker(sw) + sp.ww.PushWorker(w) } // apply all after functions @@ -248,7 +245,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) return rsp, nil } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) { // GetFreeWorker function consumes context with timeout w, err := sp.ww.GetFreeWorker(ctxGetFree) if err != nil { @@ -260,7 +257,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke // else if err not nil - return error return nil, errors.E(op, err) } - return w, nil + return w.(worker.SyncWorker), nil } // Destroy all underlying stack (but let them to complete the task). @@ -280,7 +277,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } w.State().Set(internal.StateInvalid) - err = w.Stop(bCtx) + err = w.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } @@ -293,7 +290,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { w.State().Set(internal.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) - errS := w.Stop(bCtx) + errS := w.Stop() if errS != nil { return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) @@ -303,9 +300,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } } -func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator { +func newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator { return func() (worker.BaseProcess, error) { - w, err := factory.SpawnWorkerWithContext(bCtx, cmd()) + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + w, err := factory.SpawnWorkerWithTimeout(ctx, cmd()) if err != nil { return nil, err } @@ -326,7 +325,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { r, err := sw.(worker.SyncWorker).Exec(p) - if stopErr := sw.Stop(context.Background()); stopErr != nil { + if stopErr := sw.Stop(); stopErr != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) } @@ -334,20 +333,22 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // allocate required number of stack -func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) { +func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) { const op = errors.Op("allocate workers") var workers []worker.BaseProcess // constant number of stack simplify logic for i := int64(0); i < numWorkers; i++ { - ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) - w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) + w, err := sp.allocator() if err != nil { - cancel() return nil, errors.E(op, errors.WorkerAllocate, err) } - workers = append(workers, w) - cancel() + + sw, err := syncWorker.From(w) + if err != nil { + return nil, errors.E(op, err) + } + workers = append(workers, sw) } return workers, nil } diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index b96e9214..30345aee 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -190,6 +190,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { } }) + time.Sleep(time.Second) res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go index b08d24e4..49456bd9 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -85,7 +85,7 @@ type socketSpawn struct { } // SpawnWorker creates Process and connects it to appropriate relay or returns error -func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { const op = errors.Op("spawn_worker_with_context") c := make(chan socketSpawn) go func() { @@ -174,7 +174,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { } // Close socket factory and underlying socket connection. -func (f *Factory) Close(ctx context.Context) error { +func (f *Factory) Close() error { return f.ls.Close() } diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go index 6a88713a..983f3e8e 100755 --- a/pkg/socket/socket_factory_test.go +++ b/pkg/socket/socket_factory_test.go @@ -31,7 +31,7 @@ func Test_Tcp_Start(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -39,7 +39,7 @@ func Test_Tcp_Start(t *testing.T) { assert.NoError(t, w.Wait()) }() - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -64,11 +64,11 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { } }() - w, err := f.SpawnWorkerWithContext(ctx, cmd) + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -95,7 +95,7 @@ func Test_Tcp_StartError(t *testing.T) { t.Errorf("error executing the command: error %v", err) } - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -118,7 +118,7 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd) + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err2) assert.Contains(t, err2.Error(), "failboot") @@ -141,7 +141,7 @@ func Test_Tcp_Timeout(t *testing.T) { cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0") - w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "context deadline exceeded") @@ -164,7 +164,7 @@ func Test_Tcp_Invalid(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -186,7 +186,7 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -201,7 +201,7 @@ func Test_Tcp_Broken(t *testing.T) { defer func() { time.Sleep(time.Second) - err2 := w.Stop(ctx) + err2 := w.Stop() // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection assert.Error(t, err2) }() @@ -235,12 +235,12 @@ func Test_Tcp_Echo(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -277,7 +277,7 @@ func Test_Unix_Start(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -285,7 +285,7 @@ func Test_Unix_Start(t *testing.T) { assert.NoError(t, w.Wait()) }() - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -307,7 +307,7 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "failboot") @@ -329,7 +329,7 @@ func Test_Unix_Timeout(t *testing.T) { cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") - w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "context deadline exceeded") @@ -351,7 +351,7 @@ func Test_Unix_Invalid(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -372,7 +372,7 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -387,7 +387,7 @@ func Test_Unix_Broken(t *testing.T) { defer func() { time.Sleep(time.Second) - err = w.Stop(ctx) + err = w.Stop() assert.Error(t, err) }() @@ -420,7 +420,7 @@ func Test_Unix_Echo(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -428,7 +428,7 @@ func Test_Unix_Echo(t *testing.T) { assert.NoError(t, w.Wait()) }() defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -467,7 +467,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, err := f.SpawnWorkerWithContext(ctx, cmd) + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } @@ -475,7 +475,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { assert.NoError(b, w.Wait()) }() - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } @@ -498,12 +498,12 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } @@ -539,11 +539,11 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := f.SpawnWorkerWithContext(ctx, cmd) + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } @@ -566,12 +566,12 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index eacb8a8a..11992f22 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -63,9 +63,10 @@ type wexec struct { } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) { - const op = errors.Op("ExecWithContext") +func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { + const op = errors.Op("ExecWithTimeout") c := make(chan wexec, 1) + go func() { if len(p.Body) == 0 && len(p.Context) == 0 { c <- wexec{ @@ -211,8 +212,8 @@ func (tw *syncWorker) Wait() error { return tw.w.Wait() } -func (tw *syncWorker) Stop(ctx context.Context) error { - return tw.w.Stop(ctx) +func (tw *syncWorker) Stop() error { + return tw.w.Stop() } func (tw *syncWorker) Kill() error { diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index ae59d611..456f4bea 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -2,7 +2,6 @@ package worker import ( "bytes" - "context" "fmt" "io" "os" @@ -30,13 +29,6 @@ const ( ReadBufSize = 10240 // Kb ) -var syncPool = sync.Pool{ - New: func() interface{} { - buf := make([]byte, ReadBufSize) - return &buf - }, -} - // Process - supervised process with api over goridge.Relay. type Process struct { // created indicates at what time Process has been created. @@ -79,6 +71,8 @@ type Process struct { rd io.Reader // stop signal terminates io.Pipe from reading from stderr stop chan struct{} + + syncPool sync.Pool } // InitBaseWorker creates new Process over given exec.cmd. @@ -93,6 +87,14 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { state: internal.NewWorkerState(internal.StateInactive), stderr: new(bytes.Buffer), stop: make(chan struct{}, 1), + // sync pool for STDERR + // All receivers are pointers + syncPool: sync.Pool{ + New: func() interface{} { + buf := make([]byte, ReadBufSize) + return &buf + }, + }, } w.rd, w.cmd.Stderr = io.Pipe() @@ -217,30 +219,16 @@ func (w *Process) closeRelay() error { } // Stop sends soft termination command to the Process and waits for process completion. -func (w *Process) Stop(ctx context.Context) error { - c := make(chan error) - - go func() { - var err error - w.state.Set(internal.StateStopping) - err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) - if err != nil { - w.state.Set(internal.StateKilling) - c <- multierr.Append(err, w.cmd.Process.Kill()) - } - w.state.Set(internal.StateStopped) - c <- nil - }() - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-c: - if err != nil { - return err - } - return nil +func (w *Process) Stop() error { + var err error + w.state.Set(internal.StateStopping) + err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) + if err != nil { + w.state.Set(internal.StateKilling) + return multierr.Append(err, w.cmd.Process.Kill()) } + w.state.Set(internal.StateStopped) + return nil } // Kill kills underlying process, make sure to call Wait() func to gather @@ -258,15 +246,12 @@ func (w *Process) Kill() error { // put the pointer, to not allocate new slice // but erase it len and then return back func (w *Process) put(data *[]byte) { - *data = (*data)[:0] - *data = (*data)[:cap(*data)] - - syncPool.Put(data) + w.syncPool.Put(data) } // get pointer to the byte slice func (w *Process) get() *[]byte { - return syncPool.Get().(*[]byte) + return w.syncPool.Get().(*[]byte) } // Write appends the contents of pool to the errBuffer, growing the errBuffer as @@ -282,6 +267,7 @@ func (w *Process) watch() { w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) w.mu.Lock() // write new message + // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool w.stderr.Write((*buf)[:n]) w.mu.Unlock() w.put(buf) diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 8788e509..918145e5 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -10,7 +10,6 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" - syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" ) type Stack struct { @@ -163,16 +162,12 @@ type workerWatcher struct { func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { - sw, err := syncWorker.From(workers[i]) - if err != nil { - return err - } - ww.stack.Push(sw) - sw.AddListener(ww.events.Push) + ww.stack.Push(workers[i]) + workers[i].AddListener(ww.events.Push) go func(swc worker.BaseProcess) { ww.wait(swc) - }(sw) + }(workers[i]) } return nil } diff --git a/plugins/checker/plugin.go b/plugins/checker/plugin.go index e6250697..e3e7834a 100644 --- a/plugins/checker/plugin.go +++ b/plugins/checker/plugin.go @@ -9,9 +9,9 @@ import ( "github.com/gofiber/fiber/v2/middleware/logger" "github.com/spiral/endure" "github.com/spiral/errors" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/interfaces/status" - "github.com/spiral/roadrunner/v2/plugins/config" ) const ( @@ -26,7 +26,7 @@ type Plugin struct { cfg *Config } -func (c *Plugin) Init(log log.Logger, cfg config.Configurer) error { +func (c *Plugin) Init(log log.Logger, cfg config2.Configurer) error { const op = errors.Op("status plugin init") err := cfg.UnmarshalKey(PluginName, &c.cfg) if err != nil { diff --git a/plugins/config/plugin.go b/plugins/config/plugin.go index 2555d28a..4cde314d 100755 --- a/plugins/config/plugin.go +++ b/plugins/config/plugin.go @@ -1,17 +1,18 @@ package config import ( + "bytes" "errors" - "fmt" "strings" "github.com/spf13/viper" ) type Viper struct { - viper *viper.Viper - Path string - Prefix string + viper *viper.Viper + Path string + Prefix string + ReadInCfg []byte } // Inits config provider. @@ -32,17 +33,16 @@ func (v *Viper) Init() error { v.viper.SetConfigFile(v.Path) v.viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + if v.ReadInCfg != nil { + return v.viper.ReadConfig(bytes.NewBuffer(v.ReadInCfg)) + } return v.viper.ReadInConfig() } // Overwrite overwrites existing config with provided values -func (v *Viper) Overwrite(values map[string]string) error { +func (v *Viper) Overwrite(values map[string]interface{}) error { if len(values) != 0 { - for _, flag := range values { - key, value, err := parseFlag(flag) - if err != nil { - return err - } + for key, value := range values { v.viper.Set(key, value) } } @@ -68,24 +68,3 @@ func (v *Viper) Get(name string) interface{} { func (v *Viper) Has(name string) bool { return v.viper.IsSet(name) } - -func parseFlag(flag string) (string, string, error) { - if !strings.Contains(flag, "=") { - return "", "", fmt.Errorf("invalid flag `%s`", flag) - } - - parts := strings.SplitN(strings.TrimLeft(flag, " \"'`"), "=", 2) - - return strings.Trim(parts[0], " \n\t"), parseValue(strings.Trim(parts[1], " \n\t")), nil -} - -func parseValue(value string) string { - escape := []rune(value)[0] - - if escape == '"' || escape == '\'' || escape == '`' { - value = strings.Trim(value, string(escape)) - value = strings.ReplaceAll(value, fmt.Sprintf("\\%s", string(escape)), string(escape)) - } - - return value -} diff --git a/plugins/config/tests/plugin1.go b/plugins/config/tests/plugin1.go index a276c15f..7b5d6bd8 100755 --- a/plugins/config/tests/plugin1.go +++ b/plugins/config/tests/plugin1.go @@ -4,7 +4,7 @@ import ( "errors" "time" - "github.com/spiral/roadrunner/v2/plugins/config" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" ) // ReloadConfig is a Reload configuration point. @@ -23,11 +23,11 @@ type ServiceConfig struct { } type Foo struct { - configProvider config.Configurer + configProvider config2.Configurer } // Depends on S2 and DB (S3 in the current case) -func (f *Foo) Init(p config.Configurer) error { +func (f *Foo) Init(p config2.Configurer) error { f.configProvider = p return nil } diff --git a/plugins/headers/plugin.go b/plugins/headers/plugin.go index f1c6e6f3..e16f6187 100644 --- a/plugins/headers/plugin.go +++ b/plugins/headers/plugin.go @@ -5,7 +5,7 @@ import ( "strconv" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" ) // ID contains default service name. @@ -20,7 +20,7 @@ type Plugin struct { // Init must return configure service and return true if service hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer) error { +func (s *Plugin) Init(cfg config2.Configurer) error { const op = errors.Op("headers plugin init") err := cfg.UnmarshalKey(RootPluginName, &s.cfg) if err != nil { diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 460263f6..a883735a 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/spiral/endure" "github.com/spiral/errors" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/interfaces/pool" @@ -22,7 +23,6 @@ import ( "github.com/spiral/roadrunner/v2/interfaces/status" "github.com/spiral/roadrunner/v2/interfaces/worker" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" "github.com/spiral/roadrunner/v2/util" "golang.org/x/net/http2" @@ -49,7 +49,7 @@ type middleware map[string]Middleware type Plugin struct { sync.Mutex - configurer config.Configurer + configurer config2.Configurer server server.Server log log.Logger @@ -80,7 +80,7 @@ func (s *Plugin) AddListener(listener events.EventListener) { // Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server server.Server) error { +func (s *Plugin) Init(cfg config2.Configurer, log log.Logger, server server.Server) error { const op = errors.Op("http Init") err := cfg.UnmarshalKey(PluginName, &s.cfg) if err != nil { diff --git a/plugins/http/tests/plugin1.go b/plugins/http/tests/plugin1.go index 1cbca744..7d1f32a1 100644 --- a/plugins/http/tests/plugin1.go +++ b/plugins/http/tests/plugin1.go @@ -1,12 +1,14 @@ package tests -import "github.com/spiral/roadrunner/v2/plugins/config" +import ( + config2 "github.com/spiral/roadrunner/v2/interfaces/config" +) type Plugin1 struct { - config config.Configurer + config config2.Configurer } -func (p1 *Plugin1) Init(cfg config.Configurer) error { +func (p1 *Plugin1) Init(cfg config2.Configurer) error { p1.config = cfg return nil } diff --git a/plugins/http/tests/plugin_middleware.go b/plugins/http/tests/plugin_middleware.go index de829d34..224d4117 100644 --- a/plugins/http/tests/plugin_middleware.go +++ b/plugins/http/tests/plugin_middleware.go @@ -3,14 +3,14 @@ package tests import ( "net/http" - "github.com/spiral/roadrunner/v2/plugins/config" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" ) type PluginMiddleware struct { - config config.Configurer + config config2.Configurer } -func (p *PluginMiddleware) Init(cfg config.Configurer) error { +func (p *PluginMiddleware) Init(cfg config2.Configurer) error { p.config = cfg return nil } @@ -34,10 +34,10 @@ func (p *PluginMiddleware) Name() string { } type PluginMiddleware2 struct { - config config.Configurer + config config2.Configurer } -func (p *PluginMiddleware2) Init(cfg config.Configurer) error { +func (p *PluginMiddleware2) Init(cfg config2.Configurer) error { p.config = cfg return nil } diff --git a/plugins/informer/tests/test_plugin.go b/plugins/informer/tests/test_plugin.go index 3fdefde3..80627801 100644 --- a/plugins/informer/tests/test_plugin.go +++ b/plugins/informer/tests/test_plugin.go @@ -4,10 +4,10 @@ import ( "context" "time" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/interfaces/worker" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/config" ) var testPoolConfig = poolImpl.Config{ @@ -26,11 +26,11 @@ var testPoolConfig = poolImpl.Config{ // Gauge ////////////// type Plugin1 struct { - config config.Configurer + config config2.Configurer server server.Server } -func (p1 *Plugin1) Init(cfg config.Configurer, server server.Server) error { +func (p1 *Plugin1) Init(cfg config2.Configurer, server server.Server) error { p1.config = cfg p1.server = server return nil diff --git a/plugins/logger/plugin.go b/plugins/logger/plugin.go index 64b77a64..ec58b7d6 100644 --- a/plugins/logger/plugin.go +++ b/plugins/logger/plugin.go @@ -2,8 +2,8 @@ package logger import ( "github.com/spiral/endure" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/log" - "github.com/spiral/roadrunner/v2/plugins/config" "go.uber.org/zap" ) @@ -18,7 +18,7 @@ type ZapLogger struct { } // Init logger service. -func (z *ZapLogger) Init(cfg config.Configurer) error { +func (z *ZapLogger) Init(cfg config2.Configurer) error { err := cfg.UnmarshalKey(PluginName, &z.cfg) if err != nil { return err diff --git a/plugins/logger/tests/plugin.go b/plugins/logger/tests/plugin.go index 32238f63..4095e59d 100644 --- a/plugins/logger/tests/plugin.go +++ b/plugins/logger/tests/plugin.go @@ -2,16 +2,16 @@ package tests import ( "github.com/spiral/errors" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/log" - "github.com/spiral/roadrunner/v2/plugins/config" ) type Plugin struct { - config config.Configurer + config config2.Configurer log log.Logger } -func (p1 *Plugin) Init(cfg config.Configurer, log log.Logger) error { +func (p1 *Plugin) Init(cfg config2.Configurer, log log.Logger) error { p1.config = cfg p1.log = log return nil diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go index c115826b..956166ee 100644 --- a/plugins/metrics/plugin.go +++ b/plugins/metrics/plugin.go @@ -11,9 +11,9 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spiral/endure" "github.com/spiral/errors" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/interfaces/metrics" - "github.com/spiral/roadrunner/v2/plugins/config" "golang.org/x/sys/cpu" ) @@ -40,7 +40,7 @@ type Plugin struct { } // Init service. -func (m *Plugin) Init(cfg config.Configurer, log log.Logger) error { +func (m *Plugin) Init(cfg config2.Configurer, log log.Logger) error { const op = errors.Op("Metrics Init") err := cfg.UnmarshalKey(PluginName, &m.cfg) if err != nil { diff --git a/plugins/metrics/tests/plugin1.go b/plugins/metrics/tests/plugin1.go index b48c415d..08dd2593 100644 --- a/plugins/metrics/tests/plugin1.go +++ b/plugins/metrics/tests/plugin1.go @@ -2,15 +2,15 @@ package tests import ( "github.com/prometheus/client_golang/prometheus" - "github.com/spiral/roadrunner/v2/plugins/config" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" ) // Gauge ////////////// type Plugin1 struct { - config config.Configurer + config config2.Configurer } -func (p1 *Plugin1) Init(cfg config.Configurer) error { +func (p1 *Plugin1) Init(cfg config2.Configurer) error { p1.config = cfg return nil } diff --git a/plugins/redis/config.go b/plugins/redis/config.go new file mode 100644 index 00000000..ebcefed1 --- /dev/null +++ b/plugins/redis/config.go @@ -0,0 +1,32 @@ +package redis + +import "time" + +type Config struct { + Addrs []string `yaml:"addrs"` + DB int `yaml:"db"` + Username string `yaml:"username"` + Password string `yaml:"password"` + MasterName string `yaml:"master_name"` + SentinelPassword string `yaml:"sentinel_password"` + RouteByLatency bool `yaml:"route_by_latency"` + RouteRandomly bool `yaml:"route_randomly"` + MaxRetries int `yaml:"max_retries"` + DialTimeout time.Duration `yaml:"dial_timeout"` + MinRetryBackoff time.Duration `yaml:"min_retry_backoff"` + MaxRetryBackoff time.Duration `yaml:"max_retry_backoff"` + PoolSize int `yaml:"pool_size"` + MinIdleConns int `yaml:"min_idle_conns"` + MaxConnAge time.Duration `yaml:"max_conn_age"` + ReadTimeout time.Duration `yaml:"read_timeout"` + WriteTimeout time.Duration `yaml:"write_timeout"` + PoolTimeout time.Duration `yaml:"pool_timeout"` + IdleTimeout time.Duration `yaml:"idle_timeout"` + IdleCheckFreq time.Duration `yaml:"idle_check_freq"` + ReadOnly bool `yaml:"read_only"` +} + +// InitDefaults initializing fill config with default values +func (s *Config) InitDefaults() { + s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage +} diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go new file mode 100644 index 00000000..08bb7972 --- /dev/null +++ b/plugins/redis/plugin.go @@ -0,0 +1,75 @@ +package redis + +import ( + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/config" + "github.com/spiral/roadrunner/v2/interfaces/log" +) + +const PluginName = "redis" + +type Plugin struct { + // config for RR integration + cfg *Config + // logger + log log.Logger + // redis universal client + universalClient redis.UniversalClient +} + +func (s *Plugin) GetClient() redis.UniversalClient { + return s.universalClient +} + +func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error { + const op = errors.Op("redis plugin init") + s.cfg = &Config{} + s.cfg.InitDefaults() + + err := cfg.UnmarshalKey(PluginName, &s.cfg) + if err != nil { + return errors.E(op, errors.Disabled, err) + } + + s.log = log + + s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: s.cfg.Addrs, + DB: s.cfg.DB, + Username: s.cfg.Username, + Password: s.cfg.Password, + SentinelPassword: s.cfg.SentinelPassword, + MaxRetries: s.cfg.MaxRetries, + MinRetryBackoff: s.cfg.MaxRetryBackoff, + MaxRetryBackoff: s.cfg.MaxRetryBackoff, + DialTimeout: s.cfg.DialTimeout, + ReadTimeout: s.cfg.ReadTimeout, + WriteTimeout: s.cfg.WriteTimeout, + PoolSize: s.cfg.PoolSize, + MinIdleConns: s.cfg.MinIdleConns, + MaxConnAge: s.cfg.MaxConnAge, + PoolTimeout: s.cfg.PoolTimeout, + IdleTimeout: s.cfg.IdleTimeout, + IdleCheckFrequency: s.cfg.IdleCheckFreq, + ReadOnly: s.cfg.ReadOnly, + RouteByLatency: s.cfg.RouteByLatency, + RouteRandomly: s.cfg.RouteRandomly, + MasterName: s.cfg.MasterName, + }) + + return nil +} + +func (s *Plugin) Serve() chan error { + errCh := make(chan error, 1) + return errCh +} + +func (s Plugin) Stop() error { + return s.universalClient.Close() +} + +func (s *Plugin) Name() string { + return PluginName +} diff --git a/plugins/redis/tests/plugin1.go b/plugins/redis/tests/plugin1.go new file mode 100644 index 00000000..e19ca90a --- /dev/null +++ b/plugins/redis/tests/plugin1.go @@ -0,0 +1,43 @@ +package tests + +import ( + "context" + "time" + + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + redisPlugin "github.com/spiral/roadrunner/v2/interfaces/redis" +) + +type Plugin1 struct { + redisClient redis.UniversalClient +} + +func (p *Plugin1) Init(redis redisPlugin.Redis) error { + p.redisClient = redis.GetClient() + return nil +} + +func (p *Plugin1) Serve() chan error { + const op = errors.Op("plugin1 serve") + errCh := make(chan error, 1) + p.redisClient.Set(context.Background(), "foo", "bar", time.Minute) + + stringCmd := p.redisClient.Get(context.Background(), "foo") + data, err := stringCmd.Result() + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + if data != "bar" { + errCh <- errors.E(op, errors.Str("no such key")) + return errCh + } + + return errCh +} + +func (p *Plugin1) Stop() error { + return p.redisClient.Close() +} diff --git a/plugins/redis/tests/redis_plugin_test.go b/plugins/redis/tests/redis_plugin_test.go new file mode 100644 index 00000000..8f8da983 --- /dev/null +++ b/plugins/redis/tests/redis_plugin_test.go @@ -0,0 +1,124 @@ +package tests + +import ( + "fmt" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/golang/mock/gomock" + "github.com/spiral/endure" + "github.com/spiral/roadrunner/v2/mocks" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/redis" + "github.com/stretchr/testify/assert" +) + +func redisConfig(port string) string { + cfg := ` +redis: + addrs: + - 'localhost:%s' + master_name: '' + username: '' + password: '' + db: 0 + sentinel_password: '' + route_by_latency: false + route_randomly: false + dial_timeout: 0 + max_retries: 1 + min_retry_backoff: 0 + max_retry_backoff: 0 + pool_size: 0 + min_idle_conns: 0 + max_conn_age: 0 + read_timeout: 0 + write_timeout: 0 + pool_timeout: 0 + idle_timeout: 0 + idle_check_freq: 0 + read_only: false +` + return fmt.Sprintf(cfg, port) +} + +func TestRedisInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + + s, err := miniredis.Run() + if err != nil { + panic(err) + } + defer s.Close() + + c := redisConfig(s.Port()) + + cfg := &config.Viper{} + cfg.Prefix = "rr" + cfg.Path = ".rr-redis.yaml" + cfg.ReadInCfg = []byte(c) + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + err = cont.RegisterAll( + cfg, + mockLogger, + &redis.Plugin{}, + &Plugin1{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + tt := time.NewTimer(time.Second * 10) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + wg.Wait() +} diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go index 555ddb82..233c83a4 100644 --- a/plugins/reload/plugin.go +++ b/plugins/reload/plugin.go @@ -6,9 +6,9 @@ import ( "time" "github.com/spiral/errors" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/interfaces/resetter" - "github.com/spiral/roadrunner/v2/plugins/config" ) // PluginName contains default plugin name. @@ -25,7 +25,7 @@ type Plugin struct { } // Init controller service -func (s *Plugin) Init(cfg config.Configurer, log log.Logger, res resetter.Resetter) error { +func (s *Plugin) Init(cfg config2.Configurer, log log.Logger, res resetter.Resetter) error { const op = errors.Op("reload plugin init") s.cfg = &Config{} InitDefaults(s.cfg) diff --git a/plugins/resetter/tests/test_plugin.go b/plugins/resetter/tests/test_plugin.go index 1d770e70..f1c09caf 100644 --- a/plugins/resetter/tests/test_plugin.go +++ b/plugins/resetter/tests/test_plugin.go @@ -4,9 +4,9 @@ import ( "context" "time" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/server" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/plugins/config" ) var testPoolConfig = poolImpl.Config{ @@ -25,11 +25,11 @@ var testPoolConfig = poolImpl.Config{ // Gauge ////////////// type Plugin1 struct { - config config.Configurer + config config2.Configurer server server.Server } -func (p1 *Plugin1) Init(cfg config.Configurer, server server.Server) error { +func (p1 *Plugin1) Init(cfg config2.Configurer, server server.Server) error { p1.config = cfg p1.server = server return nil diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index 98242ade..d0dc0ff1 100755 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -8,9 +8,9 @@ import ( "github.com/spiral/endure" "github.com/spiral/errors" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/log" rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc" - "github.com/spiral/roadrunner/v2/plugins/config" ) // PluginName contains default plugin name. diff --git a/plugins/rpc/tests/plugin1.go b/plugins/rpc/tests/plugin1.go index 79e98ed4..dcb256fa 100644 --- a/plugins/rpc/tests/plugin1.go +++ b/plugins/rpc/tests/plugin1.go @@ -3,14 +3,14 @@ package tests import ( "fmt" - "github.com/spiral/roadrunner/v2/plugins/config" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" ) type Plugin1 struct { - config config.Configurer + config config2.Configurer } -func (p1 *Plugin1) Init(cfg config.Configurer) error { +func (p1 *Plugin1) Init(cfg config2.Configurer) error { p1.config = cfg return nil } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index e6003fbc..580c1e10 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/spiral/errors" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/interfaces/pool" @@ -16,7 +17,6 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pipe" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/socket" - "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" ) @@ -30,7 +30,7 @@ type Plugin struct { } // Init application provider. -func (server *Plugin) Init(cfg config.Configurer, log log.Logger) error { +func (server *Plugin) Init(cfg config2.Configurer, log log.Logger) error { const op = errors.Op("Init") err := cfg.UnmarshalKey(PluginName, &server.cfg) if err != nil { @@ -62,7 +62,7 @@ func (server *Plugin) Stop() error { return nil } - return server.factory.Close(context.Background()) + return server.factory.Close() } // CmdFactory provides worker command factory assocated with given context. @@ -105,7 +105,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (worker.Bas return nil, errors.E(op, err) } - w, err := server.factory.SpawnWorkerWithContext(ctx, spawnCmd()) + w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd()) if err != nil { return nil, errors.E(op, err) } diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go index 9a8a630c..f49cf6dc 100644 --- a/plugins/server/tests/plugin_pipes.go +++ b/plugins/server/tests/plugin_pipes.go @@ -5,12 +5,12 @@ import ( "time" "github.com/spiral/errors" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/pkg/payload" poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) @@ -32,12 +32,12 @@ var testPoolConfig = poolImpl.Config{ } type Foo struct { - configProvider config.Configurer + configProvider config2.Configurer wf server.Server pool pool.Pool } -func (f *Foo) Init(p config.Configurer, workerFactory server.Server) error { +func (f *Foo) Init(p config2.Configurer, workerFactory server.Server) error { f.configProvider = p f.wf = workerFactory return nil @@ -99,7 +99,7 @@ func (f *Foo) Serve() chan error { } // should not be errors - err = sw.Stop(context.Background()) + err = sw.Stop() if err != nil { errCh <- err return errCh diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go index b1545718..ee971e45 100644 --- a/plugins/server/tests/plugin_sockets.go +++ b/plugins/server/tests/plugin_sockets.go @@ -4,21 +4,21 @@ import ( "context" "github.com/spiral/errors" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) type Foo2 struct { - configProvider config.Configurer + configProvider config2.Configurer wf server.Server pool pool.Pool } -func (f *Foo2) Init(p config.Configurer, workerFactory server.Server) error { +func (f *Foo2) Init(p config2.Configurer, workerFactory server.Server) error { f.configProvider = p f.wf = workerFactory return nil @@ -79,7 +79,7 @@ func (f *Foo2) Serve() chan error { } // should not be errors - err = sw.Stop(context.Background()) + err = sw.Stop() if err != nil { errCh <- err return errCh diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go index da92288a..cdf23e21 100644 --- a/plugins/server/tests/plugin_tcp.go +++ b/plugins/server/tests/plugin_tcp.go @@ -4,21 +4,21 @@ import ( "context" "github.com/spiral/errors" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/pool" "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/plugins/config" plugin "github.com/spiral/roadrunner/v2/plugins/server" ) type Foo3 struct { - configProvider config.Configurer + configProvider config2.Configurer wf server.Server pool pool.Pool } -func (f *Foo3) Init(p config.Configurer, workerFactory server.Server) error { +func (f *Foo3) Init(p config2.Configurer, workerFactory server.Server) error { f.configProvider = p f.wf = workerFactory return nil @@ -79,7 +79,7 @@ func (f *Foo3) Serve() chan error { } // should not be errors - err = sw.Stop(context.Background()) + err = sw.Stop() if err != nil { errCh <- err return errCh diff --git a/plugins/static/plugin.go b/plugins/static/plugin.go index cf5cee25..fd8d0a9c 100644 --- a/plugins/static/plugin.go +++ b/plugins/static/plugin.go @@ -5,8 +5,8 @@ import ( "path" "github.com/spiral/errors" + config2 "github.com/spiral/roadrunner/v2/interfaces/config" "github.com/spiral/roadrunner/v2/interfaces/log" - "github.com/spiral/roadrunner/v2/plugins/config" ) // ID contains default service name. @@ -27,7 +27,7 @@ type Plugin struct { // Init must return configure service and return true if service hasStatus enabled. Must return error in case of // misconfiguration. Services must not be used without proper configuration pushed first. -func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error { +func (s *Plugin) Init(cfg config2.Configurer, log log.Logger) error { const op = errors.Op("static plugin init") err := cfg.UnmarshalKey(RootPluginName, &s.cfg) if err != nil { |