summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-21 14:24:45 +0300
committerGitHub <[email protected]>2020-12-21 14:24:45 +0300
commit8543980775e5f8b12e5e200a0764052cdb4350a5 (patch)
treec1c6dff8e6bd81bcf51d608c5ed935702911ae81
parentfd6e9cc403fc0c3857dcf29768429a374bd85636 (diff)
parent7b32b6b93576ec72b4b7fdf2068e655f869e9cf8 (diff)
Merge pull request #453 from spiral/plugin/redis
Plugin/redis
-rw-r--r--.github/workflows/build.yml4
-rwxr-xr-x.rr.yaml74
-rwxr-xr-xMakefile2
-rwxr-xr-xgo.mod2
-rwxr-xr-xgo.sum37
-rwxr-xr-xinterfaces/config/interface.go (renamed from plugins/config/configurer.go)3
-rwxr-xr-xinterfaces/factory/factory.go22
-rw-r--r--interfaces/pool/pool.go3
-rw-r--r--interfaces/redis/interface.go9
-rw-r--r--interfaces/worker/factory.go4
-rw-r--r--interfaces/worker/worker.go4
-rwxr-xr-xpkg/pipe/pipe_factory.go4
-rwxr-xr-xpkg/pipe/pipe_factory_test.go72
-rwxr-xr-xpkg/pool/static_pool.go55
-rwxr-xr-xpkg/pool/static_pool_test.go1
-rwxr-xr-xpkg/socket/socket_factory.go4
-rwxr-xr-xpkg/socket/socket_factory_test.go58
-rwxr-xr-xpkg/worker/sync_worker.go9
-rwxr-xr-xpkg/worker/worker.go58
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go11
-rw-r--r--plugins/checker/plugin.go4
-rwxr-xr-xplugins/config/plugin.go41
-rwxr-xr-xplugins/config/tests/plugin1.go6
-rw-r--r--plugins/headers/plugin.go4
-rw-r--r--plugins/http/plugin.go6
-rw-r--r--plugins/http/tests/plugin1.go8
-rw-r--r--plugins/http/tests/plugin_middleware.go10
-rw-r--r--plugins/informer/tests/test_plugin.go6
-rw-r--r--plugins/logger/plugin.go4
-rw-r--r--plugins/logger/tests/plugin.go6
-rw-r--r--plugins/metrics/plugin.go4
-rw-r--r--plugins/metrics/tests/plugin1.go6
-rw-r--r--plugins/redis/config.go32
-rw-r--r--plugins/redis/plugin.go75
-rw-r--r--plugins/redis/tests/plugin1.go43
-rw-r--r--plugins/redis/tests/redis_plugin_test.go124
-rw-r--r--plugins/reload/plugin.go4
-rw-r--r--plugins/resetter/tests/test_plugin.go6
-rwxr-xr-xplugins/rpc/plugin.go2
-rw-r--r--plugins/rpc/tests/plugin1.go6
-rw-r--r--plugins/server/plugin.go8
-rw-r--r--plugins/server/tests/plugin_pipes.go8
-rw-r--r--plugins/server/tests/plugin_sockets.go8
-rw-r--r--plugins/server/tests/plugin_tcp.go8
-rw-r--r--plugins/static/plugin.go4
45 files changed, 607 insertions, 262 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index d6644450..51c49af7 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -65,6 +65,7 @@ jobs:
- name: Run golang tests on Windows without codecov
if: ${{ matrix.os == 'windows-latest' }}
run: |
+ go test -v -race -cover -tags=debug ./util
go test -v -race -cover -tags=debug ./pkg/pipe
go test -v -race -cover -tags=debug ./pkg/pool
go test -v -race -cover -tags=debug ./pkg/socket
@@ -84,12 +85,14 @@ jobs:
go test -v -race -cover -tags=debug ./plugins/static/tests
go test -v -race -cover -tags=debug ./plugins/static
go test -v -race -cover -tags=debug ./plugins/headers/tests
+ go test -v -race -cover -tags=debug ./plugins/redis/tests
go test -v -race -cover -tags=debug ./plugins/checker/tests
- name: Run golang tests on Linux and MacOS
if: ${{ matrix.os == 'ubuntu-20.04' || matrix.os == 'macos-latest' }}
run: |
mkdir ./coverage-ci
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/util.txt -covermode=atomic ./util
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/pipe
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/socket
@@ -110,6 +113,7 @@ jobs:
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/static_root.txt -covermode=atomic ./plugins/static
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/headers.txt -covermode=atomic ./plugins/headers/tests
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/checker.txt -covermode=atomic ./plugins/checker/tests
+ go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/redis.txt -covermode=atomic ./plugins/redis/tests
go test -v -race -cover -tags=debug -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./plugins/reload/tests
cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt
diff --git a/.rr.yaml b/.rr.yaml
index 5caff422..3e8b267d 100755
--- a/.rr.yaml
+++ b/.rr.yaml
@@ -15,7 +15,7 @@ http:
debug: true
address: 127.0.0.1:18903
maxRequestSize: 1024
- middleware: [ "" ]
+ middleware: [ "gzip", "headers" ]
uploads:
forbid: [ ".php", ".exe", ".bat" ]
trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
@@ -36,4 +36,74 @@ http:
http2:
enabled: false
h2c: false
- maxConcurrentStreams: 128 \ No newline at end of file
+ maxConcurrentStreams: 128
+
+redis:
+ # UniversalClient is an abstract client which - based on the provided options -
+ # can connect to either clusters, or sentinel-backed failover instances
+ # or simple single-instance servers. This can be useful for testing
+ # cluster-specific applications locally.
+ # if the number of addrs is 1 and master_name is empty, a single-node redis Client will be returned
+
+ # if the number of Addrs is two or more, a ClusterClient will be returned
+ addrs:
+ - 'localhost:6379'
+ # if a MasterName is passed a sentinel-backed FailoverClient will be returned
+ master_name: ''
+ username: ''
+ password: ''
+ db: 0
+ sentinel_password: ''
+ route_by_latency: false
+ route_randomly: false
+ dial_timeout: 0 # accepted values [1s, 5m, 3h]
+ max_retries: 1
+ min_retry_backoff: 0 # accepted values [1s, 5m, 3h]
+ max_retry_backoff: 0 # accepted values [1s, 5m, 3h]
+ pool_size: 0
+ min_idle_conns: 0
+ max_conn_age: 0 # accepted values [1s, 5m, 3h]
+ read_timeout: 0 # accepted values [1s, 5m, 3h]
+ write_timeout: 0 # accepted values [1s, 5m, 3h]
+ pool_timeout: 0 # accepted values [1s, 5m, 3h]
+ idle_timeout: 0 # accepted values [1s, 5m, 3h]
+ idle_check_freq: 0 # accepted values [1s, 5m, 3h]
+ read_only: false
+
+metrics:
+ # prometheus client address (path /metrics added automatically)
+ address: localhost:2112
+ collect:
+ app_metric:
+ type: histogram
+ help: "Custom application metric"
+ labels: [ "type" ]
+ buckets: [ 0.1, 0.2, 0.3, 1.0 ]
+ # objectives defines the quantile rank estimates with their respective
+ # absolute error [ for summary only ]
+ objectives:
+ - 1.4: 2.3
+ - 2.0: 1.4
+
+reload:
+ # sync interval
+ interval: 1s
+ # global patterns to sync
+ patterns: [ ".php" ]
+ # list of included for sync services
+ services:
+ http:
+ # recursive search for file patterns to add
+ recursive: true
+ # ignored folders
+ ignore: [ "vendor" ]
+ # service specific file pattens to sync
+ patterns: [ ".php", ".go",".md", ]
+ # directories to sync. If recursive is set to true,
+ # recursive sync will be applied only to the directories in `dirs` section
+ dirs: [ "." ]
+ rpc:
+ recursive: true
+ patterns: [ ".json" ]
+ # to include all project directories from workdir, leave `dirs` empty or add a dot "."
+ dirs: [ "" ] \ No newline at end of file
diff --git a/Makefile b/Makefile
index 9f86fa4a..c216bb3d 100755
--- a/Makefile
+++ b/Makefile
@@ -24,6 +24,7 @@ uninstall: ## Uninstall locally installed RR
rm -f /usr/local/bin/rr
test: ## Run application tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./util
go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pipe
go test -v -race -cover -tags=debug -covermode=atomic ./pkg/pool
go test -v -race -cover -tags=debug -covermode=atomic ./pkg/socket
@@ -43,6 +44,7 @@ test: ## Run application tests
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/static/tests
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/headers/tests
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/checker/tests
+ go test -v -race -cover -tags=debug -covermode=atomic ./plugins/redis/tests
go test -v -race -cover -tags=debug -covermode=atomic ./plugins/reload/tests
lint: ## Run application linters
diff --git a/go.mod b/go.mod
index f8da7823..5c5d9061 100755
--- a/go.mod
+++ b/go.mod
@@ -5,8 +5,10 @@ go 1.15
require (
github.com/NYTimes/gziphandler v1.1.1
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
+ github.com/alicebob/miniredis/v2 v2.14.1
github.com/fatih/color v1.10.0
github.com/go-ole/go-ole v1.2.4 // indirect
+ github.com/go-redis/redis/v8 v8.4.4
github.com/gofiber/fiber/v2 v2.3.0
github.com/golang/mock v1.4.4
github.com/hashicorp/go-multierror v1.1.0
diff --git a/go.sum b/go.sum
index eaf0d7bc..348e6043 100755
--- a/go.sum
+++ b/go.sum
@@ -29,6 +29,11 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
+github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
+github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
+github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
+github.com/alicebob/miniredis/v2 v2.14.1 h1:GjlbSeoJ24bzdLRs13HoMEeaRZx9kg5nHoRW7QV/nCs=
+github.com/alicebob/miniredis/v2 v2.14.1/go.mod h1:uS970Sw5Gs9/iK3yBg0l9Uj9s25wXxSpQUE9EaJ/Blg=
github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDafo4=
github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
@@ -57,6 +62,9 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
+github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
+github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
@@ -75,6 +83,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
@@ -92,6 +102,8 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
+github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
@@ -103,6 +115,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
+github.com/go-redis/redis/v8 v8.4.4 h1:fGqgxCTR1sydaKI00oQf3OmkU/DIe/I/fYXvGklCIuc=
+github.com/go-redis/redis/v8 v8.4.4/go.mod h1:nA0bQuF0i5JFx4Ta9RZxGKXFrQ8cRWntra97f0196iY=
github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt83WmbPeyVjCfNT9YDJ5BUFmcwFsEjI9SCvYM=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@@ -143,6 +157,8 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
+github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@@ -266,13 +282,22 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
+github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M=
+github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/onsi/gomega v1.10.4 h1:NiTx7EEvBzu9sFOD1zORteLSt3o8gnlvZZwSE9TnY9U=
+github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
@@ -411,6 +436,8 @@ github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6Ac
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yookoala/gofast v0.4.0 h1:dLBjghcsbbZNOEHN8N1X/gh9S6srmJed4WQfG7DlKwo=
github.com/yookoala/gofast v0.4.0/go.mod h1:rfbkoKaQG1bnuTUZcmV3vAlnfpF4FTq8WbQJf2vcpg8=
+github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0=
+github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
@@ -419,6 +446,8 @@ go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
+go.opentelemetry.io/otel v0.15.0 h1:CZFy2lPhxd4HlhZnYK8gRyDotksO3Ip9rBweY1vVYJw=
+go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
@@ -483,8 +512,10 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201216054612-986b41b23924 h1:QsnDpLLOKwHBBDa8nDws4DYNc/ryVW2vCpxCs09d4PY=
golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -508,6 +539,7 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -519,12 +551,16 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -629,6 +665,7 @@ gopkg.in/ini.v1 v1.38.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
diff --git a/plugins/config/configurer.go b/interfaces/config/interface.go
index 00010eae..2a7c67ce 100755
--- a/plugins/config/configurer.go
+++ b/interfaces/config/interface.go
@@ -14,6 +14,9 @@ type Configurer interface {
// Get used to get config section
Get(name string) interface{}
+ // Overwrite used to overwrite particular values in the unmarshalled config
+ Overwrite(values map[string]interface{}) error
+
// Has checks if config section exists.
Has(name string) bool
}
diff --git a/interfaces/factory/factory.go b/interfaces/factory/factory.go
deleted file mode 100755
index 51b73501..00000000
--- a/interfaces/factory/factory.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package worker
-
-import (
- "context"
- "os/exec"
-
- "github.com/spiral/roadrunner/v2/interfaces/worker"
-)
-
-// Factory is responsible of wrapping given command into tasks WorkerProcess.
-type Factory interface {
- // SpawnWorkerWithContext creates new WorkerProcess process based on given command with contex.
- // Process must not be started.
- SpawnWorkerWithContext(context.Context, *exec.Cmd) (worker.BaseProcess, error)
-
- // SpawnWorker creates new WorkerProcess process based on given command.
- // Process must not be started.
- SpawnWorker(*exec.Cmd) (worker.BaseProcess, error)
-
- // Close the factory and underlying connections.
- Close(ctx context.Context) error
-}
diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go
index 72da9597..22552388 100644
--- a/interfaces/pool/pool.go
+++ b/interfaces/pool/pool.go
@@ -18,9 +18,10 @@ type Pool interface {
// GetConfig returns pool configuration.
GetConfig() interface{}
- // Exec
+ // Exec executes task with payload
Exec(rqs payload.Payload) (payload.Payload, error)
+ // ExecWithContext executes task with context which is used with timeout
ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
// Workers returns worker list associated with the pool.
diff --git a/interfaces/redis/interface.go b/interfaces/redis/interface.go
new file mode 100644
index 00000000..909c8ca4
--- /dev/null
+++ b/interfaces/redis/interface.go
@@ -0,0 +1,9 @@
+package redis
+
+import "github.com/go-redis/redis/v8"
+
+// Redis in the redis KV plugin interface
+type Redis interface {
+ // GetClient
+ GetClient() redis.UniversalClient
+}
diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go
index 19e2bf5d..8db8ddcc 100644
--- a/interfaces/worker/factory.go
+++ b/interfaces/worker/factory.go
@@ -9,10 +9,10 @@ import (
type Factory interface {
// SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
// Process must not be started.
- SpawnWorkerWithContext(context.Context, *exec.Cmd) (BaseProcess, error)
+ SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (BaseProcess, error)
// SpawnWorker creates new WorkerProcess process based on given command.
// Process must not be started.
SpawnWorker(*exec.Cmd) (BaseProcess, error)
// Close the factory and underlying connections.
- Close(ctx context.Context) error
+ Close() error
}
diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go
index f830fdf2..7f2f8a53 100644
--- a/interfaces/worker/worker.go
+++ b/interfaces/worker/worker.go
@@ -40,7 +40,7 @@ type BaseProcess interface {
Wait() error
// Stop sends soft termination command to the WorkerProcess and waits for process completion.
- Stop(ctx context.Context) error
+ Stop() error
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
@@ -59,5 +59,5 @@ type SyncWorker interface {
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs payload.Payload) (payload.Payload, error)
// ExecWithContext used to handle Exec with TTL
- ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error)
+ ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error)
}
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go
index 34735fe6..a0e0c258 100755
--- a/pkg/pipe/pipe_factory.go
+++ b/pkg/pipe/pipe_factory.go
@@ -31,7 +31,7 @@ type SpawnResult struct {
// SpawnWorker creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
c := make(chan SpawnResult)
const op = errors.Op("spawn worker with context")
go func() {
@@ -159,6 +159,6 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
}
// Close the factory.
-func (f *Factory) Close(ctx context.Context) error {
+func (f *Factory) Close() error {
return nil
}
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index 40797747..0d548b7a 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -20,7 +20,7 @@ func Test_GetState(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
assert.Equal(t, internal.StateStopped, w.State().Value())
@@ -30,7 +30,7 @@ func Test_GetState(t *testing.T) {
assert.NotNil(t, w)
assert.Equal(t, internal.StateReady, w.State().Value())
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -40,7 +40,7 @@ func Test_Kill(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
@@ -65,7 +65,7 @@ func Test_Pipe_Start(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -73,7 +73,7 @@ func Test_Pipe_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- assert.NoError(t, w.Stop(ctx))
+ assert.NoError(t, w.Stop())
}
func Test_Pipe_StartError(t *testing.T) {
@@ -84,7 +84,7 @@ func Test_Pipe_StartError(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -97,7 +97,7 @@ func Test_Pipe_PipeError(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -110,7 +110,7 @@ func Test_Pipe_PipeError2(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -118,7 +118,7 @@ func Test_Pipe_PipeError2(t *testing.T) {
func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
@@ -128,7 +128,7 @@ func Test_Pipe_Failboot(t *testing.T) {
func Test_Pipe_Invalid(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -136,12 +136,12 @@ func Test_Pipe_Invalid(t *testing.T) {
func Test_Pipe_Echo(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -165,13 +165,13 @@ func Test_Pipe_Echo(t *testing.T) {
func Test_Pipe_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
defer func() {
time.Sleep(time.Second)
- err = w.Stop(ctx)
+ err = w.Stop()
assert.Error(t, err)
}()
@@ -191,14 +191,14 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
f := NewPipeFactory()
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := f.SpawnWorkerWithContext(context.Background(), cmd)
+ w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd)
go func() {
if w.Wait() != nil {
b.Fail()
}
}()
- err := w.Stop(context.Background())
+ err := w.Stop()
if err != nil {
b.Errorf("error stopping the worker: error %v", err)
}
@@ -208,7 +208,7 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd)
sw, err := workerImpl.From(w)
if err != nil {
b.Fatal(err)
@@ -222,7 +222,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}
}()
defer func() {
- err := w.Stop(context.Background())
+ err := w.Stop()
if err != nil {
b.Errorf("error stopping the worker: error %v", err)
}
@@ -238,13 +238,13 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -265,13 +265,13 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -293,7 +293,7 @@ func Test_Echo(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -306,7 +306,7 @@ func Test_Echo(t *testing.T) {
assert.NoError(t, syncWorker.Wait())
}()
defer func() {
- err := syncWorker.Stop(ctx)
+ err := syncWorker.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -326,7 +326,7 @@ func Test_BadPayload(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
syncWorker, err := workerImpl.From(w)
if err != nil {
@@ -337,7 +337,7 @@ func Test_BadPayload(t *testing.T) {
assert.NoError(t, syncWorker.Wait())
}()
defer func() {
- err := syncWorker.Stop(ctx)
+ err := syncWorker.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -356,12 +356,12 @@ func Test_String(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -376,12 +376,12 @@ func Test_Echo_Slow(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -406,7 +406,7 @@ func Test_Broken(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -437,20 +437,20 @@ func Test_Broken(t *testing.T) {
t.Fail()
}
mu.Unlock()
- assert.Error(t, w.Stop(ctx))
+ assert.Error(t, w.Stop())
}
func Test_Error(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "error", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -476,12 +476,12 @@ func Test_NumExecs(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 9cf79fd4..2a06b255 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -3,6 +3,7 @@ package pool
import (
"context"
"os/exec"
+ "time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/events"
@@ -18,8 +19,6 @@ import (
// StopRequest can be sent by worker to indicate that restart is required.
const StopRequest = "{\"stop\":true}"
-var bCtx = context.Background()
-
// ErrorEncoder encode error or make a decision based on the error type
type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
@@ -77,10 +76,10 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory,
before: make([]Before, 0, 0),
}
- p.allocator = newPoolAllocator(factory, cmd)
+ p.allocator = newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
- workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
+ workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
@@ -169,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
sw.State().Set(internal.StateInvalid)
- err = sw.Stop(bCtx)
+ err = sw.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
@@ -204,8 +203,6 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return payload.Payload{}, errors.E(op, err)
}
- sw := w.(worker.SyncWorker)
-
// apply all before function
if len(sp.before) > 0 {
for i := 0; i < len(sp.before); i++ {
@@ -213,29 +210,29 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
}
- rsp, err := sw.ExecWithContext(ctx, rqs)
+ rsp, err := w.ExecWithTimeout(ctx, rqs)
if err != nil {
- return sp.errEncoder(err, sw)
+ return sp.errEncoder(err, w)
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- sw.State().Set(internal.StateInvalid)
- err = sw.Stop(bCtx)
+ w.State().Set(internal.StateInvalid)
+ err = w.Stop()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
return sp.Exec(rqs)
}
- if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
} else {
- sp.ww.PushWorker(sw)
+ sp.ww.PushWorker(w)
}
// apply all after functions
@@ -248,7 +245,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return rsp, nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
// GetFreeWorker function consumes context with timeout
w, err := sp.ww.GetFreeWorker(ctxGetFree)
if err != nil {
@@ -260,7 +257,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
// else if err not nil - return error
return nil, errors.E(op, err)
}
- return w, nil
+ return w.(worker.SyncWorker), nil
}
// Destroy all underlying stack (but let them to complete the task).
@@ -280,7 +277,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
w.State().Set(internal.StateInvalid)
- err = w.Stop(bCtx)
+ err = w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
@@ -293,7 +290,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
w.State().Set(internal.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- errS := w.Stop(bCtx)
+ errS := w.Stop()
if errS != nil {
return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
@@ -303,9 +300,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
}
-func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
+func newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.BaseProcess, error) {
- w, err := factory.SpawnWorkerWithContext(bCtx, cmd())
+ ctx, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ w, err := factory.SpawnWorkerWithTimeout(ctx, cmd())
if err != nil {
return nil, err
}
@@ -326,7 +325,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
r, err := sw.(worker.SyncWorker).Exec(p)
- if stopErr := sw.Stop(context.Background()); stopErr != nil {
+ if stopErr := sw.Stop(); stopErr != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
}
@@ -334,20 +333,22 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
var workers []worker.BaseProcess
// constant number of stack simplify logic
for i := int64(0); i < numWorkers; i++ {
- ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
- w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
+ w, err := sp.allocator()
if err != nil {
- cancel()
return nil, errors.E(op, errors.WorkerAllocate, err)
}
- workers = append(workers, w)
- cancel()
+
+ sw, err := syncWorker.From(w)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+ workers = append(workers, sw)
}
return workers, nil
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index b96e9214..30345aee 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -190,6 +190,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
}
})
+ time.Sleep(time.Second)
res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go
index b08d24e4..49456bd9 100755
--- a/pkg/socket/socket_factory.go
+++ b/pkg/socket/socket_factory.go
@@ -85,7 +85,7 @@ type socketSpawn struct {
}
// SpawnWorker creates Process and connects it to appropriate relay or returns error
-func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
const op = errors.Op("spawn_worker_with_context")
c := make(chan socketSpawn)
go func() {
@@ -174,7 +174,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
}
// Close socket factory and underlying socket connection.
-func (f *Factory) Close(ctx context.Context) error {
+func (f *Factory) Close() error {
return f.ls.Close()
}
diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go
index 6a88713a..983f3e8e 100755
--- a/pkg/socket/socket_factory_test.go
+++ b/pkg/socket/socket_factory_test.go
@@ -31,7 +31,7 @@ func Test_Tcp_Start(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -39,7 +39,7 @@ func Test_Tcp_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -64,11 +64,11 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
}
}()
- w, err := f.SpawnWorkerWithContext(ctx, cmd)
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -95,7 +95,7 @@ func Test_Tcp_StartError(t *testing.T) {
t.Errorf("error executing the command: error %v", err)
}
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -118,7 +118,7 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd)
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err2)
assert.Contains(t, err2.Error(), "failboot")
@@ -141,7 +141,7 @@ func Test_Tcp_Timeout(t *testing.T) {
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0")
- w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "context deadline exceeded")
@@ -164,7 +164,7 @@ func Test_Tcp_Invalid(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -186,7 +186,7 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -201,7 +201,7 @@ func Test_Tcp_Broken(t *testing.T) {
defer func() {
time.Sleep(time.Second)
- err2 := w.Stop(ctx)
+ err2 := w.Stop()
// write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
assert.Error(t, err2)
}()
@@ -235,12 +235,12 @@ func Test_Tcp_Echo(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -277,7 +277,7 @@ func Test_Unix_Start(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -285,7 +285,7 @@ func Test_Unix_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -307,7 +307,7 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failboot")
@@ -329,7 +329,7 @@ func Test_Unix_Timeout(t *testing.T) {
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0")
- w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "context deadline exceeded")
@@ -351,7 +351,7 @@ func Test_Unix_Invalid(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -372,7 +372,7 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -387,7 +387,7 @@ func Test_Unix_Broken(t *testing.T) {
defer func() {
time.Sleep(time.Second)
- err = w.Stop(ctx)
+ err = w.Stop()
assert.Error(t, err)
}()
@@ -420,7 +420,7 @@ func Test_Unix_Echo(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -428,7 +428,7 @@ func Test_Unix_Echo(t *testing.T) {
assert.NoError(t, w.Wait())
}()
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -467,7 +467,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := f.SpawnWorkerWithContext(ctx, cmd)
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
@@ -475,7 +475,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
assert.NoError(b, w.Wait())
}()
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -498,12 +498,12 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -539,11 +539,11 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := f.SpawnWorkerWithContext(ctx, cmd)
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -566,12 +566,12 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index eacb8a8a..11992f22 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -63,9 +63,10 @@ type wexec struct {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("ExecWithContext")
+func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("ExecWithTimeout")
c := make(chan wexec, 1)
+
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
@@ -211,8 +212,8 @@ func (tw *syncWorker) Wait() error {
return tw.w.Wait()
}
-func (tw *syncWorker) Stop(ctx context.Context) error {
- return tw.w.Stop(ctx)
+func (tw *syncWorker) Stop() error {
+ return tw.w.Stop()
}
func (tw *syncWorker) Kill() error {
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index ae59d611..456f4bea 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -2,7 +2,6 @@ package worker
import (
"bytes"
- "context"
"fmt"
"io"
"os"
@@ -30,13 +29,6 @@ const (
ReadBufSize = 10240 // Kb
)
-var syncPool = sync.Pool{
- New: func() interface{} {
- buf := make([]byte, ReadBufSize)
- return &buf
- },
-}
-
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
@@ -79,6 +71,8 @@ type Process struct {
rd io.Reader
// stop signal terminates io.Pipe from reading from stderr
stop chan struct{}
+
+ syncPool sync.Pool
}
// InitBaseWorker creates new Process over given exec.cmd.
@@ -93,6 +87,14 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
state: internal.NewWorkerState(internal.StateInactive),
stderr: new(bytes.Buffer),
stop: make(chan struct{}, 1),
+ // sync pool for STDERR
+ // All receivers are pointers
+ syncPool: sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, ReadBufSize)
+ return &buf
+ },
+ },
}
w.rd, w.cmd.Stderr = io.Pipe()
@@ -217,30 +219,16 @@ func (w *Process) closeRelay() error {
}
// Stop sends soft termination command to the Process and waits for process completion.
-func (w *Process) Stop(ctx context.Context) error {
- c := make(chan error)
-
- go func() {
- var err error
- w.state.Set(internal.StateStopping)
- err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
- if err != nil {
- w.state.Set(internal.StateKilling)
- c <- multierr.Append(err, w.cmd.Process.Kill())
- }
- w.state.Set(internal.StateStopped)
- c <- nil
- }()
-
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-c:
- if err != nil {
- return err
- }
- return nil
+func (w *Process) Stop() error {
+ var err error
+ w.state.Set(internal.StateStopping)
+ err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
+ if err != nil {
+ w.state.Set(internal.StateKilling)
+ return multierr.Append(err, w.cmd.Process.Kill())
}
+ w.state.Set(internal.StateStopped)
+ return nil
}
// Kill kills underlying process, make sure to call Wait() func to gather
@@ -258,15 +246,12 @@ func (w *Process) Kill() error {
// put the pointer, to not allocate new slice
// but erase it len and then return back
func (w *Process) put(data *[]byte) {
- *data = (*data)[:0]
- *data = (*data)[:cap(*data)]
-
- syncPool.Put(data)
+ w.syncPool.Put(data)
}
// get pointer to the byte slice
func (w *Process) get() *[]byte {
- return syncPool.Get().(*[]byte)
+ return w.syncPool.Get().(*[]byte)
}
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
@@ -282,6 +267,7 @@ func (w *Process) watch() {
w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
w.mu.Lock()
// write new message
+ // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool
w.stderr.Write((*buf)[:n])
w.mu.Unlock()
w.put(buf)
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 8788e509..918145e5 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -10,7 +10,6 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
@@ -163,16 +162,12 @@ type workerWatcher struct {
func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- sw, err := syncWorker.From(workers[i])
- if err != nil {
- return err
- }
- ww.stack.Push(sw)
- sw.AddListener(ww.events.Push)
+ ww.stack.Push(workers[i])
+ workers[i].AddListener(ww.events.Push)
go func(swc worker.BaseProcess) {
ww.wait(swc)
- }(sw)
+ }(workers[i])
}
return nil
}
diff --git a/plugins/checker/plugin.go b/plugins/checker/plugin.go
index e6250697..e3e7834a 100644
--- a/plugins/checker/plugin.go
+++ b/plugins/checker/plugin.go
@@ -9,9 +9,9 @@ import (
"github.com/gofiber/fiber/v2/middleware/logger"
"github.com/spiral/endure"
"github.com/spiral/errors"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/log"
"github.com/spiral/roadrunner/v2/interfaces/status"
- "github.com/spiral/roadrunner/v2/plugins/config"
)
const (
@@ -26,7 +26,7 @@ type Plugin struct {
cfg *Config
}
-func (c *Plugin) Init(log log.Logger, cfg config.Configurer) error {
+func (c *Plugin) Init(log log.Logger, cfg config2.Configurer) error {
const op = errors.Op("status plugin init")
err := cfg.UnmarshalKey(PluginName, &c.cfg)
if err != nil {
diff --git a/plugins/config/plugin.go b/plugins/config/plugin.go
index 2555d28a..4cde314d 100755
--- a/plugins/config/plugin.go
+++ b/plugins/config/plugin.go
@@ -1,17 +1,18 @@
package config
import (
+ "bytes"
"errors"
- "fmt"
"strings"
"github.com/spf13/viper"
)
type Viper struct {
- viper *viper.Viper
- Path string
- Prefix string
+ viper *viper.Viper
+ Path string
+ Prefix string
+ ReadInCfg []byte
}
// Inits config provider.
@@ -32,17 +33,16 @@ func (v *Viper) Init() error {
v.viper.SetConfigFile(v.Path)
v.viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
+ if v.ReadInCfg != nil {
+ return v.viper.ReadConfig(bytes.NewBuffer(v.ReadInCfg))
+ }
return v.viper.ReadInConfig()
}
// Overwrite overwrites existing config with provided values
-func (v *Viper) Overwrite(values map[string]string) error {
+func (v *Viper) Overwrite(values map[string]interface{}) error {
if len(values) != 0 {
- for _, flag := range values {
- key, value, err := parseFlag(flag)
- if err != nil {
- return err
- }
+ for key, value := range values {
v.viper.Set(key, value)
}
}
@@ -68,24 +68,3 @@ func (v *Viper) Get(name string) interface{} {
func (v *Viper) Has(name string) bool {
return v.viper.IsSet(name)
}
-
-func parseFlag(flag string) (string, string, error) {
- if !strings.Contains(flag, "=") {
- return "", "", fmt.Errorf("invalid flag `%s`", flag)
- }
-
- parts := strings.SplitN(strings.TrimLeft(flag, " \"'`"), "=", 2)
-
- return strings.Trim(parts[0], " \n\t"), parseValue(strings.Trim(parts[1], " \n\t")), nil
-}
-
-func parseValue(value string) string {
- escape := []rune(value)[0]
-
- if escape == '"' || escape == '\'' || escape == '`' {
- value = strings.Trim(value, string(escape))
- value = strings.ReplaceAll(value, fmt.Sprintf("\\%s", string(escape)), string(escape))
- }
-
- return value
-}
diff --git a/plugins/config/tests/plugin1.go b/plugins/config/tests/plugin1.go
index a276c15f..7b5d6bd8 100755
--- a/plugins/config/tests/plugin1.go
+++ b/plugins/config/tests/plugin1.go
@@ -4,7 +4,7 @@ import (
"errors"
"time"
- "github.com/spiral/roadrunner/v2/plugins/config"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
)
// ReloadConfig is a Reload configuration point.
@@ -23,11 +23,11 @@ type ServiceConfig struct {
}
type Foo struct {
- configProvider config.Configurer
+ configProvider config2.Configurer
}
// Depends on S2 and DB (S3 in the current case)
-func (f *Foo) Init(p config.Configurer) error {
+func (f *Foo) Init(p config2.Configurer) error {
f.configProvider = p
return nil
}
diff --git a/plugins/headers/plugin.go b/plugins/headers/plugin.go
index f1c6e6f3..e16f6187 100644
--- a/plugins/headers/plugin.go
+++ b/plugins/headers/plugin.go
@@ -5,7 +5,7 @@ import (
"strconv"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/config"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
)
// ID contains default service name.
@@ -20,7 +20,7 @@ type Plugin struct {
// Init must return configure service and return true if service hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer) error {
+func (s *Plugin) Init(cfg config2.Configurer) error {
const op = errors.Op("headers plugin init")
err := cfg.UnmarshalKey(RootPluginName, &s.cfg)
if err != nil {
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 460263f6..a883735a 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -15,6 +15,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/spiral/endure"
"github.com/spiral/errors"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/log"
"github.com/spiral/roadrunner/v2/interfaces/pool"
@@ -22,7 +23,6 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/status"
"github.com/spiral/roadrunner/v2/interfaces/worker"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
"github.com/spiral/roadrunner/v2/util"
"golang.org/x/net/http2"
@@ -49,7 +49,7 @@ type middleware map[string]Middleware
type Plugin struct {
sync.Mutex
- configurer config.Configurer
+ configurer config2.Configurer
server server.Server
log log.Logger
@@ -80,7 +80,7 @@ func (s *Plugin) AddListener(listener events.EventListener) {
// Init must return configure svc and return true if svc hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, log log.Logger, server server.Server) error {
+func (s *Plugin) Init(cfg config2.Configurer, log log.Logger, server server.Server) error {
const op = errors.Op("http Init")
err := cfg.UnmarshalKey(PluginName, &s.cfg)
if err != nil {
diff --git a/plugins/http/tests/plugin1.go b/plugins/http/tests/plugin1.go
index 1cbca744..7d1f32a1 100644
--- a/plugins/http/tests/plugin1.go
+++ b/plugins/http/tests/plugin1.go
@@ -1,12 +1,14 @@
package tests
-import "github.com/spiral/roadrunner/v2/plugins/config"
+import (
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
+)
type Plugin1 struct {
- config config.Configurer
+ config config2.Configurer
}
-func (p1 *Plugin1) Init(cfg config.Configurer) error {
+func (p1 *Plugin1) Init(cfg config2.Configurer) error {
p1.config = cfg
return nil
}
diff --git a/plugins/http/tests/plugin_middleware.go b/plugins/http/tests/plugin_middleware.go
index de829d34..224d4117 100644
--- a/plugins/http/tests/plugin_middleware.go
+++ b/plugins/http/tests/plugin_middleware.go
@@ -3,14 +3,14 @@ package tests
import (
"net/http"
- "github.com/spiral/roadrunner/v2/plugins/config"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
)
type PluginMiddleware struct {
- config config.Configurer
+ config config2.Configurer
}
-func (p *PluginMiddleware) Init(cfg config.Configurer) error {
+func (p *PluginMiddleware) Init(cfg config2.Configurer) error {
p.config = cfg
return nil
}
@@ -34,10 +34,10 @@ func (p *PluginMiddleware) Name() string {
}
type PluginMiddleware2 struct {
- config config.Configurer
+ config config2.Configurer
}
-func (p *PluginMiddleware2) Init(cfg config.Configurer) error {
+func (p *PluginMiddleware2) Init(cfg config2.Configurer) error {
p.config = cfg
return nil
}
diff --git a/plugins/informer/tests/test_plugin.go b/plugins/informer/tests/test_plugin.go
index 3fdefde3..80627801 100644
--- a/plugins/informer/tests/test_plugin.go
+++ b/plugins/informer/tests/test_plugin.go
@@ -4,10 +4,10 @@ import (
"context"
"time"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/interfaces/worker"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/plugins/config"
)
var testPoolConfig = poolImpl.Config{
@@ -26,11 +26,11 @@ var testPoolConfig = poolImpl.Config{
// Gauge //////////////
type Plugin1 struct {
- config config.Configurer
+ config config2.Configurer
server server.Server
}
-func (p1 *Plugin1) Init(cfg config.Configurer, server server.Server) error {
+func (p1 *Plugin1) Init(cfg config2.Configurer, server server.Server) error {
p1.config = cfg
p1.server = server
return nil
diff --git a/plugins/logger/plugin.go b/plugins/logger/plugin.go
index 64b77a64..ec58b7d6 100644
--- a/plugins/logger/plugin.go
+++ b/plugins/logger/plugin.go
@@ -2,8 +2,8 @@ package logger
import (
"github.com/spiral/endure"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/log"
- "github.com/spiral/roadrunner/v2/plugins/config"
"go.uber.org/zap"
)
@@ -18,7 +18,7 @@ type ZapLogger struct {
}
// Init logger service.
-func (z *ZapLogger) Init(cfg config.Configurer) error {
+func (z *ZapLogger) Init(cfg config2.Configurer) error {
err := cfg.UnmarshalKey(PluginName, &z.cfg)
if err != nil {
return err
diff --git a/plugins/logger/tests/plugin.go b/plugins/logger/tests/plugin.go
index 32238f63..4095e59d 100644
--- a/plugins/logger/tests/plugin.go
+++ b/plugins/logger/tests/plugin.go
@@ -2,16 +2,16 @@ package tests
import (
"github.com/spiral/errors"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/log"
- "github.com/spiral/roadrunner/v2/plugins/config"
)
type Plugin struct {
- config config.Configurer
+ config config2.Configurer
log log.Logger
}
-func (p1 *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+func (p1 *Plugin) Init(cfg config2.Configurer, log log.Logger) error {
p1.config = cfg
p1.log = log
return nil
diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go
index c115826b..956166ee 100644
--- a/plugins/metrics/plugin.go
+++ b/plugins/metrics/plugin.go
@@ -11,9 +11,9 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spiral/endure"
"github.com/spiral/errors"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/log"
"github.com/spiral/roadrunner/v2/interfaces/metrics"
- "github.com/spiral/roadrunner/v2/plugins/config"
"golang.org/x/sys/cpu"
)
@@ -40,7 +40,7 @@ type Plugin struct {
}
// Init service.
-func (m *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+func (m *Plugin) Init(cfg config2.Configurer, log log.Logger) error {
const op = errors.Op("Metrics Init")
err := cfg.UnmarshalKey(PluginName, &m.cfg)
if err != nil {
diff --git a/plugins/metrics/tests/plugin1.go b/plugins/metrics/tests/plugin1.go
index b48c415d..08dd2593 100644
--- a/plugins/metrics/tests/plugin1.go
+++ b/plugins/metrics/tests/plugin1.go
@@ -2,15 +2,15 @@ package tests
import (
"github.com/prometheus/client_golang/prometheus"
- "github.com/spiral/roadrunner/v2/plugins/config"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
)
// Gauge //////////////
type Plugin1 struct {
- config config.Configurer
+ config config2.Configurer
}
-func (p1 *Plugin1) Init(cfg config.Configurer) error {
+func (p1 *Plugin1) Init(cfg config2.Configurer) error {
p1.config = cfg
return nil
}
diff --git a/plugins/redis/config.go b/plugins/redis/config.go
new file mode 100644
index 00000000..ebcefed1
--- /dev/null
+++ b/plugins/redis/config.go
@@ -0,0 +1,32 @@
+package redis
+
+import "time"
+
+type Config struct {
+ Addrs []string `yaml:"addrs"`
+ DB int `yaml:"db"`
+ Username string `yaml:"username"`
+ Password string `yaml:"password"`
+ MasterName string `yaml:"master_name"`
+ SentinelPassword string `yaml:"sentinel_password"`
+ RouteByLatency bool `yaml:"route_by_latency"`
+ RouteRandomly bool `yaml:"route_randomly"`
+ MaxRetries int `yaml:"max_retries"`
+ DialTimeout time.Duration `yaml:"dial_timeout"`
+ MinRetryBackoff time.Duration `yaml:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `yaml:"max_retry_backoff"`
+ PoolSize int `yaml:"pool_size"`
+ MinIdleConns int `yaml:"min_idle_conns"`
+ MaxConnAge time.Duration `yaml:"max_conn_age"`
+ ReadTimeout time.Duration `yaml:"read_timeout"`
+ WriteTimeout time.Duration `yaml:"write_timeout"`
+ PoolTimeout time.Duration `yaml:"pool_timeout"`
+ IdleTimeout time.Duration `yaml:"idle_timeout"`
+ IdleCheckFreq time.Duration `yaml:"idle_check_freq"`
+ ReadOnly bool `yaml:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage
+}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
new file mode 100644
index 00000000..08bb7972
--- /dev/null
+++ b/plugins/redis/plugin.go
@@ -0,0 +1,75 @@
+package redis
+
+import (
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/config"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+)
+
+const PluginName = "redis"
+
+type Plugin struct {
+ // config for RR integration
+ cfg *Config
+ // logger
+ log log.Logger
+ // redis universal client
+ universalClient redis.UniversalClient
+}
+
+func (s *Plugin) GetClient() redis.UniversalClient {
+ return s.universalClient
+}
+
+func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+ const op = errors.Op("redis plugin init")
+ s.cfg = &Config{}
+ s.cfg.InitDefaults()
+
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ return errors.E(op, errors.Disabled, err)
+ }
+
+ s.log = log
+
+ s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: s.cfg.Addrs,
+ DB: s.cfg.DB,
+ Username: s.cfg.Username,
+ Password: s.cfg.Password,
+ SentinelPassword: s.cfg.SentinelPassword,
+ MaxRetries: s.cfg.MaxRetries,
+ MinRetryBackoff: s.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: s.cfg.MaxRetryBackoff,
+ DialTimeout: s.cfg.DialTimeout,
+ ReadTimeout: s.cfg.ReadTimeout,
+ WriteTimeout: s.cfg.WriteTimeout,
+ PoolSize: s.cfg.PoolSize,
+ MinIdleConns: s.cfg.MinIdleConns,
+ MaxConnAge: s.cfg.MaxConnAge,
+ PoolTimeout: s.cfg.PoolTimeout,
+ IdleTimeout: s.cfg.IdleTimeout,
+ IdleCheckFrequency: s.cfg.IdleCheckFreq,
+ ReadOnly: s.cfg.ReadOnly,
+ RouteByLatency: s.cfg.RouteByLatency,
+ RouteRandomly: s.cfg.RouteRandomly,
+ MasterName: s.cfg.MasterName,
+ })
+
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ return errCh
+}
+
+func (s Plugin) Stop() error {
+ return s.universalClient.Close()
+}
+
+func (s *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/redis/tests/plugin1.go b/plugins/redis/tests/plugin1.go
new file mode 100644
index 00000000..e19ca90a
--- /dev/null
+++ b/plugins/redis/tests/plugin1.go
@@ -0,0 +1,43 @@
+package tests
+
+import (
+ "context"
+ "time"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ redisPlugin "github.com/spiral/roadrunner/v2/interfaces/redis"
+)
+
+type Plugin1 struct {
+ redisClient redis.UniversalClient
+}
+
+func (p *Plugin1) Init(redis redisPlugin.Redis) error {
+ p.redisClient = redis.GetClient()
+ return nil
+}
+
+func (p *Plugin1) Serve() chan error {
+ const op = errors.Op("plugin1 serve")
+ errCh := make(chan error, 1)
+ p.redisClient.Set(context.Background(), "foo", "bar", time.Minute)
+
+ stringCmd := p.redisClient.Get(context.Background(), "foo")
+ data, err := stringCmd.Result()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ if data != "bar" {
+ errCh <- errors.E(op, errors.Str("no such key"))
+ return errCh
+ }
+
+ return errCh
+}
+
+func (p *Plugin1) Stop() error {
+ return p.redisClient.Close()
+}
diff --git a/plugins/redis/tests/redis_plugin_test.go b/plugins/redis/tests/redis_plugin_test.go
new file mode 100644
index 00000000..8f8da983
--- /dev/null
+++ b/plugins/redis/tests/redis_plugin_test.go
@@ -0,0 +1,124 @@
+package tests
+
+import (
+ "fmt"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/alicebob/miniredis/v2"
+ "github.com/golang/mock/gomock"
+ "github.com/spiral/endure"
+ "github.com/spiral/roadrunner/v2/mocks"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/redis"
+ "github.com/stretchr/testify/assert"
+)
+
+func redisConfig(port string) string {
+ cfg := `
+redis:
+ addrs:
+ - 'localhost:%s'
+ master_name: ''
+ username: ''
+ password: ''
+ db: 0
+ sentinel_password: ''
+ route_by_latency: false
+ route_randomly: false
+ dial_timeout: 0
+ max_retries: 1
+ min_retry_backoff: 0
+ max_retry_backoff: 0
+ pool_size: 0
+ min_idle_conns: 0
+ max_conn_age: 0
+ read_timeout: 0
+ write_timeout: 0
+ pool_timeout: 0
+ idle_timeout: 0
+ idle_check_freq: 0
+ read_only: false
+`
+ return fmt.Sprintf(cfg, port)
+}
+
+func TestRedisInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ s, err := miniredis.Run()
+ if err != nil {
+ panic(err)
+ }
+ defer s.Close()
+
+ c := redisConfig(s.Port())
+
+ cfg := &config.Viper{}
+ cfg.Prefix = "rr"
+ cfg.Path = ".rr-redis.yaml"
+ cfg.ReadInCfg = []byte(c)
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ err = cont.RegisterAll(
+ cfg,
+ mockLogger,
+ &redis.Plugin{},
+ &Plugin1{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ tt := time.NewTimer(time.Second * 10)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ wg.Wait()
+}
diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go
index 555ddb82..233c83a4 100644
--- a/plugins/reload/plugin.go
+++ b/plugins/reload/plugin.go
@@ -6,9 +6,9 @@ import (
"time"
"github.com/spiral/errors"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/log"
"github.com/spiral/roadrunner/v2/interfaces/resetter"
- "github.com/spiral/roadrunner/v2/plugins/config"
)
// PluginName contains default plugin name.
@@ -25,7 +25,7 @@ type Plugin struct {
}
// Init controller service
-func (s *Plugin) Init(cfg config.Configurer, log log.Logger, res resetter.Resetter) error {
+func (s *Plugin) Init(cfg config2.Configurer, log log.Logger, res resetter.Resetter) error {
const op = errors.Op("reload plugin init")
s.cfg = &Config{}
InitDefaults(s.cfg)
diff --git a/plugins/resetter/tests/test_plugin.go b/plugins/resetter/tests/test_plugin.go
index 1d770e70..f1c09caf 100644
--- a/plugins/resetter/tests/test_plugin.go
+++ b/plugins/resetter/tests/test_plugin.go
@@ -4,9 +4,9 @@ import (
"context"
"time"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/server"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/plugins/config"
)
var testPoolConfig = poolImpl.Config{
@@ -25,11 +25,11 @@ var testPoolConfig = poolImpl.Config{
// Gauge //////////////
type Plugin1 struct {
- config config.Configurer
+ config config2.Configurer
server server.Server
}
-func (p1 *Plugin1) Init(cfg config.Configurer, server server.Server) error {
+func (p1 *Plugin1) Init(cfg config2.Configurer, server server.Server) error {
p1.config = cfg
p1.server = server
return nil
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
index 98242ade..d0dc0ff1 100755
--- a/plugins/rpc/plugin.go
+++ b/plugins/rpc/plugin.go
@@ -8,9 +8,9 @@ import (
"github.com/spiral/endure"
"github.com/spiral/errors"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/log"
rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc"
- "github.com/spiral/roadrunner/v2/plugins/config"
)
// PluginName contains default plugin name.
diff --git a/plugins/rpc/tests/plugin1.go b/plugins/rpc/tests/plugin1.go
index 79e98ed4..dcb256fa 100644
--- a/plugins/rpc/tests/plugin1.go
+++ b/plugins/rpc/tests/plugin1.go
@@ -3,14 +3,14 @@ package tests
import (
"fmt"
- "github.com/spiral/roadrunner/v2/plugins/config"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
)
type Plugin1 struct {
- config config.Configurer
+ config config2.Configurer
}
-func (p1 *Plugin1) Init(cfg config.Configurer) error {
+func (p1 *Plugin1) Init(cfg config2.Configurer) error {
p1.config = cfg
return nil
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index e6003fbc..580c1e10 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -8,6 +8,7 @@ import (
"strings"
"github.com/spiral/errors"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/log"
"github.com/spiral/roadrunner/v2/interfaces/pool"
@@ -16,7 +17,6 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pipe"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/socket"
- "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
)
@@ -30,7 +30,7 @@ type Plugin struct {
}
// Init application provider.
-func (server *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+func (server *Plugin) Init(cfg config2.Configurer, log log.Logger) error {
const op = errors.Op("Init")
err := cfg.UnmarshalKey(PluginName, &server.cfg)
if err != nil {
@@ -62,7 +62,7 @@ func (server *Plugin) Stop() error {
return nil
}
- return server.factory.Close(context.Background())
+ return server.factory.Close()
}
// CmdFactory provides worker command factory assocated with given context.
@@ -105,7 +105,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (worker.Bas
return nil, errors.E(op, err)
}
- w, err := server.factory.SpawnWorkerWithContext(ctx, spawnCmd())
+ w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd())
if err != nil {
return nil, errors.E(op, err)
}
diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go
index 9a8a630c..f49cf6dc 100644
--- a/plugins/server/tests/plugin_pipes.go
+++ b/plugins/server/tests/plugin_pipes.go
@@ -5,12 +5,12 @@ import (
"time"
"github.com/spiral/errors"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/pkg/payload"
poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
@@ -32,12 +32,12 @@ var testPoolConfig = poolImpl.Config{
}
type Foo struct {
- configProvider config.Configurer
+ configProvider config2.Configurer
wf server.Server
pool pool.Pool
}
-func (f *Foo) Init(p config.Configurer, workerFactory server.Server) error {
+func (f *Foo) Init(p config2.Configurer, workerFactory server.Server) error {
f.configProvider = p
f.wf = workerFactory
return nil
@@ -99,7 +99,7 @@ func (f *Foo) Serve() chan error {
}
// should not be errors
- err = sw.Stop(context.Background())
+ err = sw.Stop()
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go
index b1545718..ee971e45 100644
--- a/plugins/server/tests/plugin_sockets.go
+++ b/plugins/server/tests/plugin_sockets.go
@@ -4,21 +4,21 @@ import (
"context"
"github.com/spiral/errors"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
type Foo2 struct {
- configProvider config.Configurer
+ configProvider config2.Configurer
wf server.Server
pool pool.Pool
}
-func (f *Foo2) Init(p config.Configurer, workerFactory server.Server) error {
+func (f *Foo2) Init(p config2.Configurer, workerFactory server.Server) error {
f.configProvider = p
f.wf = workerFactory
return nil
@@ -79,7 +79,7 @@ func (f *Foo2) Serve() chan error {
}
// should not be errors
- err = sw.Stop(context.Background())
+ err = sw.Stop()
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go
index da92288a..cdf23e21 100644
--- a/plugins/server/tests/plugin_tcp.go
+++ b/plugins/server/tests/plugin_tcp.go
@@ -4,21 +4,21 @@ import (
"context"
"github.com/spiral/errors"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/pool"
"github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/plugins/config"
plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
type Foo3 struct {
- configProvider config.Configurer
+ configProvider config2.Configurer
wf server.Server
pool pool.Pool
}
-func (f *Foo3) Init(p config.Configurer, workerFactory server.Server) error {
+func (f *Foo3) Init(p config2.Configurer, workerFactory server.Server) error {
f.configProvider = p
f.wf = workerFactory
return nil
@@ -79,7 +79,7 @@ func (f *Foo3) Serve() chan error {
}
// should not be errors
- err = sw.Stop(context.Background())
+ err = sw.Stop()
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/static/plugin.go b/plugins/static/plugin.go
index cf5cee25..fd8d0a9c 100644
--- a/plugins/static/plugin.go
+++ b/plugins/static/plugin.go
@@ -5,8 +5,8 @@ import (
"path"
"github.com/spiral/errors"
+ config2 "github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/log"
- "github.com/spiral/roadrunner/v2/plugins/config"
)
// ID contains default service name.
@@ -27,7 +27,7 @@ type Plugin struct {
// Init must return configure service and return true if service hasStatus enabled. Must return error in case of
// misconfiguration. Services must not be used without proper configuration pushed first.
-func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+func (s *Plugin) Init(cfg config2.Configurer, log log.Logger) error {
const op = errors.Op("static plugin init")
err := cfg.UnmarshalKey(RootPluginName, &s.cfg)
if err != nil {