diff options
author | Valery Piashchynski <[email protected]> | 2021-09-02 20:09:01 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-02 20:09:01 +0300 |
commit | 6749db4a2d39fa70b426bcf50edf66a176c07f57 (patch) | |
tree | f6f92c9f0016f6bcac6a9aa45ccc961eebf90018 | |
parent | 0437d1f58514f694ea86e8176e621c009cd510f9 (diff) | |
parent | 4524f8c5af045ed5048250b63b7859eaeb4f24a1 (diff) |
#778: feat(refactor): jobs code adjustingv2.4.0
#778: feat(refactor): jobs code adjusting
50 files changed, 384 insertions, 646 deletions
diff --git a/.golangci.yml b/.golangci.yml index 55186659..f6ead63e 100755 --- a/.golangci.yml +++ b/.golangci.yml @@ -74,7 +74,6 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/ - structcheck # Finds unused struct fields - stylecheck # Stylecheck is a replacement for golint - tparallel # detects inappropriate usage of t.Parallel() method in your Go test codes - - typecheck # Like the front-end of a Go compiler, parses and type-checks Go code - unconvert # Remove unnecessary type conversions - unused # Checks Go code for unused constants, variables, functions and types - varcheck # Finds unused global variables and constants diff --git a/CHANGELOG.md b/CHANGELOG.md index fcec90cb..897877d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ CHANGELOG ========= -v2.4.0 (_.08.2021) +v2.4.0 (02.09.2021) ------------------- ## 💔 Internal BC: @@ -10,20 +10,27 @@ v2.4.0 (_.08.2021) ## 👀 New: -- ✏️ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. - Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `ephemeral` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726) +- ✏️ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `memory` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726) - ✏️ Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port`, `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2) - ✏️ Support for the Docker images via GitHub packages. -- ✏️ Go 1.17 support. +- ✏️ Go 1.17 support for the all spiral packages. ## 🩹 Fixes: - 🐛 Fix: fixed bug with goroutines waiting on the internal worker's container channel, [issue](https://github.com/spiral/roadrunner/issues/750). - 🐛 Fix: RR become unresponsive when new workers failed to re-allocate, [issue](https://github.com/spiral/roadrunner/issues/772). +- 🐛 Fix: add `debug` pool config key to the `.rr.yaml` configuration [reference](https://github.com/spiral/roadrunner-binary/issues/79). + +## 📦 Packages: + +- 📦 Update goridge to `v3.2.1` +- 📦 Update temporal to `v1.0.9` +- 📦 Update endure to `v1.0.4` ## 📈 Summary: -- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29) +- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29?closed=1) +- RR-Binary Milestone [2.4.0](https://github.com/spiral/roadrunner-binary/milestone/10?closed=1) --- @@ -15,15 +15,15 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.txt -covermode=atomic ./plugins/websockets - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload @@ -35,7 +35,6 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/headers.txt -covermode=atomic ./tests/plugins/headers go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/logger.txt -covermode=atomic ./tests/plugins/logger go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/metrics.txt -covermode=atomic ./tests/plugins/metrics - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/redis.txt -covermode=atomic ./tests/plugins/redis go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/resetter.txt -covermode=atomic ./tests/plugins/resetter go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./tests/plugins/rpc cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt @@ -50,10 +49,15 @@ test: ## Run application tests go test -v -race -tags=debug ./pkg/worker_watcher go test -v -race -tags=debug ./pkg/bst go test -v -race -tags=debug ./pkg/priority_queue - go test -v -race -tags=debug ./plugins/jobs/job go test -v -race -tags=debug ./plugins/jobs/pipeline go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./plugins/server + go test -v -race -tags=debug ./plugins/jobs/job + go test -v -race -tags=debug ./tests/plugins/jobs + go test -v -race -tags=debug ./tests/plugins/kv + go test -v -race -tags=debug ./tests/plugins/broadcast + go test -v -race -tags=debug ./tests/plugins/websockets + go test -v -race -tags=debug ./plugins/websockets go test -v -race -tags=debug ./tests/plugins/http go test -v -race -tags=debug ./tests/plugins/informer go test -v -race -tags=debug ./tests/plugins/reload @@ -65,12 +69,6 @@ test: ## Run application tests go test -v -race -tags=debug ./tests/plugins/headers go test -v -race -tags=debug ./tests/plugins/logger go test -v -race -tags=debug ./tests/plugins/metrics - go test -v -race -tags=debug ./tests/plugins/redis go test -v -race -tags=debug ./tests/plugins/resetter go test -v -race -tags=debug ./tests/plugins/rpc - go test -v -race -tags=debug ./tests/plugins/kv - go test -v -race -tags=debug ./tests/plugins/broadcast - go test -v -race -tags=debug ./tests/plugins/websockets - go test -v -race -tags=debug ./plugins/websockets - go test -v -race -tags=debug ./tests/plugins/jobs docker-compose -f tests/env/docker-compose.yaml down diff --git a/common/kv/interface.go b/common/kv/interface.go index 5736a6a7..bc6a07b2 100644 --- a/common/kv/interface.go +++ b/common/kv/interface.go @@ -30,6 +30,9 @@ type Storage interface { // Delete one or multiple keys. Delete(keys ...string) error + + // Stop the storage driver + Stop() } // Constructor provides storage based on the config @@ -4,7 +4,6 @@ go 1.17 require ( github.com/Shopify/toxiproxy v2.1.4+incompatible - github.com/alicebob/miniredis/v2 v2.15.1 // ========= AWS SDK v2 github.com/aws/aws-sdk-go-v2 v1.9.0 github.com/aws/aws-sdk-go-v2/config v1.7.0 @@ -25,10 +24,10 @@ require ( github.com/klauspost/compress v1.13.5 github.com/prometheus/client_golang v1.11.0 github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891 - github.com/shirou/gopsutil v3.21.7+incompatible + github.com/shirou/gopsutil v3.21.8+incompatible github.com/spf13/viper v1.8.1 // SPIRAL ==== - github.com/spiral/endure v1.0.3 + github.com/spiral/endure v1.0.4 github.com/spiral/errors v1.0.12 github.com/spiral/goridge/v3 v3.2.1 // =========== @@ -40,14 +39,13 @@ require ( go.uber.org/zap v1.19.0 golang.org/x/net v0.0.0-20210825183410-e898025ed96a golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf + golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e google.golang.org/protobuf v1.27.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) require ( github.com/StackExchange/wmi v1.2.1 // indirect - github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/andybalholm/brotli v1.0.3 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2 // indirect @@ -86,7 +84,6 @@ require ( github.com/valyala/fasthttp v1.29.0 // indirect github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect - github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.5 // indirect @@ -49,10 +49,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= -github.com/alicebob/miniredis/v2 v2.15.1 h1:Fw+ixAJPmKhCLBqDwHlTDqxUxp0xjEwXczEpt1B6r7k= -github.com/alicebob/miniredis/v2 v2.15.1/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I= github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.2/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0UnM= @@ -358,8 +354,8 @@ github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c/go.mod h1:TWNAOTaVz github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873 h1:N3Af8f13ooDKcIhsmFT7Z05CStZWu4C7Md0uDEy4q6o= github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873/go.mod h1:dmPawKuiAeG/aFYVs2i+Dyosoo7FNcm+Pi8iK6ZUrX8= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/shirou/gopsutil v3.21.7+incompatible h1:g/wcPHcuCQvHSePVofjQljd2vX4ty0+J6VoMB+NPcdk= -github.com/shirou/gopsutil v3.21.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shirou/gopsutil v3.21.8+incompatible h1:sh0foI8tMRlCidUJR+KzqWYWxrkuuPIGiO6Vp+KXdCU= +github.com/shirou/gopsutil v3.21.8+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -379,8 +375,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44= github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= -github.com/spiral/endure v1.0.3 h1:07E4MkwHMOJyjotHlTq56SsEK0aCsVslkzR106aj9hk= -github.com/spiral/endure v1.0.3/go.mod h1:b2hAQBpsyuDL3LDg2dLTs2htYhlY+hLwBgGE075B6yU= +github.com/spiral/endure v1.0.4 h1:qpProWUVuu6fRceMnIHs9SkpkjlzAxPl7UxSH6zUPDo= +github.com/spiral/endure v1.0.4/go.mod h1:I9IoSCMtqXVmXX0TQ3Gu72Z1uIDVNKlhKXmcCoqnR/w= github.com/spiral/errors v1.0.12 h1:38Waf8ZL/Xvxg4HTYGmrUbvi7TCHivmuatNQZlBhQ8s= github.com/spiral/errors v1.0.12/go.mod h1:j5UReqxZxfkwXkI9mFY87VhEXcXmSg7kAk5Sswy1eEA= github.com/spiral/goridge/v3 v3.2.1 h1:5IJofcvWYjAy+X5XevOhwf/8F0i0Bu/baPsBGiSgqzU= @@ -419,9 +415,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= -github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw= -github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= @@ -567,7 +560,6 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -622,8 +614,8 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k= -golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e h1:XMgFehsDnnLGtjvjOfqWSUzt0alpTR1RSEuznObga2c= +golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 784a102c..2ff0a40a 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -242,6 +242,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("rabbit_run") pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -287,7 +288,8 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -323,6 +325,7 @@ func (c *consumer) State(ctx context.Context) (*jobState.State, error) { } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -356,11 +359,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -415,22 +420,25 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Stop(context.Context) error { - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} - } + start := time.Now() + c.stopCh <- struct{}{} pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) + return nil } diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go index 04385afe..b837ff86 100644 --- a/plugins/amqp/amqpjobs/item.go +++ b/plugins/amqp/amqpjobs/item.go @@ -43,17 +43,18 @@ type Options struct { Delay int64 `json:"delay,omitempty"` // private - // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + // ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery ack func(multiply bool) error - // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error + requeueFn func(context.Context, *Item) error + // delayed jobs TODO(rustatian): figure out how to get stats from the DLX delayed *int64 multipleAsk bool requeue bool diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 8d21784f..698a34a6 100644 --- a/plugins/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -113,25 +113,22 @@ func (c *consumer) redialer() { //nolint:gocognit c.Unlock() case <-c.stopCh: - if c.publishChan != nil { - pch := <-c.publishChan - err := pch.Close() - if err != nil { - c.log.Error("publish channel close", "error", err) - } + pch := <-c.publishChan + err := pch.Close() + if err != nil { + c.log.Error("publish channel close", "error", err) } if c.consumeChan != nil { - err := c.consumeChan.Close() + err = c.consumeChan.Close() if err != nil { c.log.Error("consume channel close", "error", err) } } - if c.conn != nil { - err := c.conn.Close() - if err != nil { - c.log.Error("amqp connection close", "error", err) - } + + err = c.conn.Close() + if err != nil { + c.log.Error("amqp connection close", "error", err) } return diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go index 5ef89983..30807f03 100644 --- a/plugins/beanstalk/consumer.go +++ b/plugins/beanstalk/consumer.go @@ -3,6 +3,7 @@ package beanstalk import ( "bytes" "context" + "encoding/gob" "strconv" "strings" "sync/atomic" @@ -183,11 +184,16 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error { bb := new(bytes.Buffer) bb.Grow(64) - err := item.pack(bb) + err := gob.NewEncoder(bb).Encode(item) if err != nil { return errors.E(op, err) } + body := make([]byte, bb.Len()) + copy(body, bb.Bytes()) + bb.Reset() + bb = nil + // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458 // <pri> is an integer < 2**32. Jobs with smaller priority values will be // scheduled before jobs with larger priorities. The most urgent priority is 0; @@ -203,7 +209,7 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout) + id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout) if err != nil { errD := j.pool.Delete(ctx, id) if errD != nil { @@ -260,9 +266,10 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") - // check if the pipeline registered + start := time.Now() // load atomic value + // check if the pipeline registered pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name())) @@ -276,13 +283,15 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Stop(context.Context) error { + start := time.Now() pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -293,13 +302,15 @@ func (j *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Pause(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -322,11 +333,13 @@ func (j *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (j *consumer) Resume(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -351,7 +364,8 @@ func (j *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go index 0a6cd560..03060994 100644 --- a/plugins/beanstalk/item.go +++ b/plugins/beanstalk/item.go @@ -125,15 +125,6 @@ func fromJob(job *job.Job) *Item { } } -func (i *Item) pack(b *bytes.Buffer) error { - err := gob.NewEncoder(b).Encode(i) - if err != nil { - return err - } - - return nil -} - func (j *consumer) unpack(id uint64, data []byte, out *Item) error { err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) if err != nil { diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index ed0eda61..62045d3b 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return &consumer{ file: pipeline.String(file, rrDB), priority: pipeline.Int(priority, 10), - prefetch: pipeline.Int(prefetch, 100), + prefetch: pipeline.Int(prefetch, 1000), permissions: conf.Permissions, bPool: sync.Pool{New: func() interface{} { @@ -300,6 +300,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("boltdb_run") + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { @@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Stop(_ context.Context) error { + start := time.Now() if atomic.LoadUint32(&c.listeners) > 0 { c.stopCh <- struct{}{} c.stopCh <- struct{}{} @@ -334,12 +337,14 @@ func (c *consumer) Stop(_ context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -361,11 +366,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -389,7 +396,8 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 7c161555..081d3f57 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -3,6 +3,7 @@ package boltjobs import ( "bytes" "encoding/gob" + "sync/atomic" "time" "github.com/spiral/roadrunner/v2/utils" @@ -18,6 +19,10 @@ func (c *consumer) listener() { c.log.Info("boltdb listener stopped") return case <-tt.C: + if atomic.LoadUint64(c.active) > uint64(c.prefetch) { + time.Sleep(time.Second) + continue + } tx, err := c.db.Begin(true) if err != nil { c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go index ba1450cd..656d572e 100644 --- a/plugins/boltdb/boltkv/driver.go +++ b/plugins/boltdb/boltkv/driver.go @@ -38,7 +38,7 @@ type Driver struct { stop chan struct{} } -func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { +func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { const op = errors.Op("new_boltdb_driver") if !cfgPlugin.Has(RootPluginName) { @@ -47,7 +47,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, d := &Driver{ log: log, - stop: stop, + stop: make(chan struct{}), } err := cfgPlugin.UnmarshalKey(key, &d.cfg) @@ -411,6 +411,10 @@ func (d *Driver) Clear() error { return nil } +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + // ========================= PRIVATE ================================= func (d *Driver) startGCLoop() { //nolint:gocognit diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md index 317aec90..1424e586 100644 --- a/plugins/boltdb/doc/job_lifecycle.md +++ b/plugins/boltdb/doc/job_lifecycle.md @@ -7,4 +7,3 @@ There are several boltdb buckets: get into the `InQueueBucket` waiting to acknowledgement. 3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration. -`` diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go index 683b26f1..ad98cf3c 100644 --- a/plugins/boltdb/plugin.go +++ b/plugins/boltdb/plugin.go @@ -19,19 +19,14 @@ const ( // Plugin BoltDB K/V storage. type Plugin struct { - cfgPlugin config.Configurer + cfg config.Configurer // logger log logger.Logger - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - - drivers uint } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.stop = make(chan struct{}) p.log = log - p.cfgPlugin = cfg + p.cfg = cfg return nil } @@ -41,12 +36,6 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } @@ -60,23 +49,20 @@ func (p *Plugin) Available() {} func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") - st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfg) if err != nil { return nil, errors.E(op, err) } - // save driver number to release resources after Stop - p.drivers++ - return st, nil } // JOBS bbolt implementation func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue) + return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfg, e, queue) } func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { - return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue) + return boltjobs.FromPipeline(pipe, p.log, p.cfg, e, queue) } diff --git a/plugins/ephemeral/plugin.go b/plugins/ephemeral/plugin.go deleted file mode 100644 index 28495abb..00000000 --- a/plugins/ephemeral/plugin.go +++ /dev/null @@ -1,41 +0,0 @@ -package ephemeral - -import ( - "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/events" - priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "ephemeral" -) - -type Plugin struct { - log logger.Logger - cfg config.Configurer -} - -func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - p.log = log - p.cfg = cfg - return nil -} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Available() {} - -// JobsConstruct creates new ephemeral consumer from the configuration -func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewJobBroker(configKey, p.log, p.cfg, e, pq) -} - -// FromPipeline creates new ephemeral consumer from the provided pipeline -func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipeline, p.log, e, pq) -} diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go index 06c3254e..adab2a0a 100644 --- a/plugins/jobs/job/job.go +++ b/plugins/jobs/job/job.go @@ -45,17 +45,6 @@ type Options struct { Delay int64 `json:"delay,omitempty"` } -// Merge merges job options. -func (o *Options) Merge(from *Options) { - if o.Pipeline == "" { - o.Pipeline = from.Pipeline - } - - if o.Delay == 0 { - o.Delay = from.Delay - } -} - // DelayDuration returns delay duration in a form of time.Duration. func (o *Options) DelayDuration() time.Duration { return time.Second * time.Duration(o.Delay) diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go index a47151a3..4a95e27d 100644 --- a/plugins/jobs/job/job_test.go +++ b/plugins/jobs/job/job_test.go @@ -16,30 +16,3 @@ func TestOptions_DelayDuration2(t *testing.T) { opts := &Options{Delay: 1} assert.Equal(t, time.Second, opts.DelayDuration()) } - -func TestOptions_Merge(t *testing.T) { - opts := &Options{} - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - }) - - assert.Equal(t, "pipeline", opts.Pipeline) - assert.Equal(t, int64(2), opts.Delay) -} - -func TestOptions_MergeKeepOriginal(t *testing.T) { - opts := &Options{ - Pipeline: "default", - Delay: 10, - } - - opts.Merge(&Options{ - Pipeline: "pipeline", - Delay: 2, - }) - - assert.Equal(t, "default", opts.Pipeline) - assert.Equal(t, int64(10), opts.Delay) -} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 3f3fa196..3aec6acc 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -41,7 +41,7 @@ type Plugin struct { server server.Server jobConstructors map[string]jobs.Constructor - consumers map[string]jobs.Consumer + consumers sync.Map // map[string]jobs.Consumer // events handler events events.Handler @@ -82,7 +82,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.events.AddListener(p.collectJobsEvents) p.jobConstructors = make(map[string]jobs.Constructor) - p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) p.stopCh = make(chan struct{}, 1) @@ -130,19 +129,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // jobConstructors contains constructors for the drivers // we need here to initialize these drivers for the pipelines - if c, ok := p.jobConstructors[dr]; ok { + if _, ok := p.jobConstructors[dr]; ok { // config key for the particular sub-driver jobs.pipelines.test-local configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name) // init the driver - initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue) + initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue) if err != nil { errCh <- errors.E(op, err) return false } // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers[name] = initializedDriver + p.consumers.Store(name, initializedDriver) // register pipeline for the initialized driver err = initializedDriver.Register(context.Background(), pipe) @@ -331,16 +330,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit } func (p *Plugin) Stop() error { - for k, v := range p.consumers { + // range over all consumers and call stop + p.consumers.Range(func(key, value interface{}) bool { + consumer := value.(jobs.Consumer) ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := v.Stop(ctx) + err := consumer.Stop(ctx) if err != nil { cancel() - p.log.Error("stop job driver", "driver", k) - continue + p.log.Error("stop job driver", "driver", key) + return true } cancel() - } + return true + }) // this function can block forever, but we don't care, because we might have a chance to exit from the pollers, // but if not, this is not a problem at all. @@ -394,18 +396,26 @@ func (p *Plugin) Workers() []*process.State { func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) { const op = errors.Op("jobs_plugin_drivers_state") - jst := make([]*jobState.State, 0, len(p.consumers)) - for k := range p.consumers { - d := p.consumers[k] + jst := make([]*jobState.State, 0, 2) + var err error + p.consumers.Range(func(key, value interface{}) bool { + consumer := value.(jobs.Consumer) newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout)) - state, err := d.State(newCtx) + + var state *jobState.State + state, err = consumer.State(newCtx) if err != nil { cancel() - return nil, errors.E(op, err) + return false } jst = append(jst, state) cancel() + return true + }) + + if err != nil { + return nil, errors.E(op, err) } return jst, nil } @@ -449,13 +459,12 @@ func (p *Plugin) Push(j *job.Job) error { // type conversion ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) } // if job has no priority, inherit it from the pipeline - // TODO(rustatian) merge all options, not only priority if j.Options.Priority == 0 { j.Options.Priority = ppl.Priority() } @@ -463,16 +472,16 @@ func (p *Plugin) Push(j *job.Job) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) defer cancel() - err := d.Push(ctx, j) + err := d.(jobs.Consumer).Push(ctx, j) if err != nil { p.events.Push(events.JobEvent{ Event: events.EventPushError, ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return errors.E(op, err) } @@ -482,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error { ID: j.Ident, Pipeline: ppl.Name(), Driver: ppl.Driver(), + Error: err, Start: start, Elapsed: time.Since(start), - Error: err, }) return nil @@ -492,9 +501,9 @@ func (p *Plugin) Push(j *job.Job) error { func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() for i := 0; i < len(j); i++ { - start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -503,7 +512,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) } @@ -514,7 +523,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { } ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - err := d.Push(ctx, j[i]) + err := d.(jobs.Consumer).Push(ctx, j[i]) if err != nil { cancel() p.events.Push(events.JobEvent{ @@ -544,7 +553,7 @@ func (p *Plugin) Pause(pp string) { ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { p.log.Warn("driver for the pipeline not found", "pipeline", pp) return @@ -552,7 +561,7 @@ func (p *Plugin) Pause(pp string) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) defer cancel() // redirect call to the underlying driver - d.Pause(ctx, ppl.Name()) + d.(jobs.Consumer).Pause(ctx, ppl.Name()) } func (p *Plugin) Resume(pp string) { @@ -563,7 +572,7 @@ func (p *Plugin) Resume(pp string) { ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + d, ok := p.consumers.Load(ppl.Name()) if !ok { p.log.Warn("driver for the pipeline not found", "pipeline", pp) return @@ -572,7 +581,7 @@ func (p *Plugin) Resume(pp string) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) defer cancel() // redirect call to the underlying driver - d.Resume(ctx, ppl.Name()) + d.(jobs.Consumer).Resume(ctx, ppl.Name()) } // Declare a pipeline. @@ -586,16 +595,13 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { // jobConstructors contains constructors for the drivers // we need here to initialize these drivers for the pipelines - if c, ok := p.jobConstructors[dr]; ok { + if _, ok := p.jobConstructors[dr]; ok { // init the driver from pipeline - initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue) + initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue) if err != nil { return errors.E(op, err) } - // add driver to the set of the consumers (name - pipeline name, value - associated driver) - p.consumers[pipeline.Name()] = initializedDriver - // register pipeline for the initialized driver err = initializedDriver.Register(context.Background(), pipeline) if err != nil { @@ -612,10 +618,12 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error { return errors.E(op, err) } } - } - // save the pipeline - p.pipelines.Store(pipeline.Name(), pipeline) + // add driver to the set of the consumers (name - pipeline name, value - associated driver) + p.consumers.Store(pipeline.Name(), initializedDriver) + // save the pipeline + p.pipelines.Store(pipeline.Name(), pipeline) + } return nil } @@ -631,18 +639,24 @@ func (p *Plugin) Destroy(pp string) error { // type conversion ppl := pipe.(*pipeline.Pipeline) - d, ok := p.consumers[ppl.Name()] + // delete consumer + d, ok := p.consumers.LoadAndDelete(ppl.Name()) if !ok { return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver())) } - // delete consumer - delete(p.consumers, ppl.Name()) - p.pipelines.Delete(pp) + // delete old pipeline + p.pipelines.LoadAndDelete(pp) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout)) - defer cancel() + err := d.(jobs.Consumer).Stop(ctx) + if err != nil { + cancel() + return errors.E(op, err) + } - return d.Stop(ctx) + cancel() + return nil } func (p *Plugin) List() []string { diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 94f903d5..d7b93bd1 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -25,7 +25,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error { return errors.E(op, errors.Str("empty ID field not allowed")) } - err := r.p.Push(r.from(j.GetJob())) + err := r.p.Push(from(j.GetJob())) if err != nil { return errors.E(op, err) } @@ -43,7 +43,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err for i := 0; i < l; i++ { // convert transport entity into domain // how we can do this quickly - batch[i] = r.from(j.GetJobs()[i]) + batch[i] = from(j.GetJobs()[i]) } err := r.p.PushBatch(batch) @@ -137,8 +137,8 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { } // from converts from transport entity to domain -func (r *rpc) from(j *jobsv1beta.Job) *job.Job { - headers := map[string][]string{} +func from(j *jobsv1beta.Job) *job.Job { + headers := make(map[string][]string, len(j.GetHeaders())) for k, v := range j.GetHeaders() { headers[k] = v.GetValue() diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index c6ca96c3..a1144b85 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -104,6 +104,10 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { + // stop all attached storages + for k := range p.storages { + p.storages[k].Stop() + } return nil } diff --git a/plugins/memcached/config.go b/plugins/memcached/memcachedkv/config.go index 6d413790..569e2573 100644 --- a/plugins/memcached/config.go +++ b/plugins/memcached/memcachedkv/config.go @@ -1,4 +1,4 @@ -package memcached +package memcachedkv type Config struct { // Addr is url for memcached, 11211 port is used by default diff --git a/plugins/memcached/driver.go b/plugins/memcached/memcachedkv/driver.go index e24747fe..6d5e1802 100644 --- a/plugins/memcached/driver.go +++ b/plugins/memcached/memcachedkv/driver.go @@ -1,4 +1,4 @@ -package memcached +package memcachedkv import ( "strings" @@ -246,3 +246,5 @@ func (d *Driver) Clear() error { return nil } + +func (d *Driver) Stop() {} diff --git a/plugins/memcached/plugin.go b/plugins/memcached/plugin.go index 59a2b7cb..47bca0e2 100644 --- a/plugins/memcached/plugin.go +++ b/plugins/memcached/plugin.go @@ -5,6 +5,7 @@ import ( "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memcached/memcachedkv" ) const ( @@ -39,7 +40,7 @@ func (s *Plugin) Available() {} func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") - st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) + st, err := memcachedkv.NewMemcachedDriver(s.log, key, s.cfgPlugin) if err != nil { return nil, errors.E(op, err) } diff --git a/plugins/ephemeral/consumer.go b/plugins/memory/memoryjobs/consumer.go index 8870bb0f..fbdedefe 100644 --- a/plugins/ephemeral/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -1,4 +1,4 @@ -package ephemeral +package memoryjobs import ( "context" @@ -53,7 +53,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh goroutines: 0, active: utils.Int64(0), delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -72,20 +72,16 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { - jb := &consumer{ - log: log, - pq: pq, - eh: eh, - goroutines: 0, - active: utils.Int64(0), - delayed: utils.Int64(0), - stopCh: make(chan struct{}, 1), - } - - // initialize a local queue - jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) - - return jb, nil + return &consumer{ + log: log, + pq: pq, + eh: eh, + localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)), + goroutines: 0, + active: utils.Int64(0), + delayed: utils.Int64(0), + stopCh: make(chan struct{}), + }, nil } func (c *consumer) Push(ctx context.Context, jb *job.Job) error { @@ -123,6 +119,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -144,12 +141,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -169,8 +167,9 @@ func (c *consumer) Resume(_ context.Context, p string) { c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Driver: pipe.Driver(), + Start: start, + Elapsed: time.Since(start), }) } @@ -186,17 +185,28 @@ func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { } func (c *consumer) Stop(_ context.Context) error { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} + select { + case c.stopCh <- struct{}{}: + default: + break + } + + for i := 0; i < len(c.localPrefetch); i++ { + // drain all jobs from the channel + <-c.localPrefetch } + c.localPrefetch = nil + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, + Driver: pipe.Driver(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -219,10 +229,12 @@ func (c *consumer) handleItem(ctx context.Context, msg *Item) error { time.Sleep(jj.Options.DelayDuration()) - // send the item after timeout expired - c.localPrefetch <- jj - - atomic.AddUint64(&c.goroutines, ^uint64(0)) + select { + case c.localPrefetch <- jj: + atomic.AddUint64(&c.goroutines, ^uint64(0)) + default: + c.log.Warn("can't push job", "error", "local queue closed or full") + } }(msg) return nil @@ -247,7 +259,7 @@ func (c *consumer) consume() { select { case item, ok := <-c.localPrefetch: if !ok { - c.log.Warn("ephemeral local prefetch queue was closed") + c.log.Warn("ephemeral local prefetch queue closed") return } diff --git a/plugins/ephemeral/item.go b/plugins/memory/memoryjobs/item.go index 3298424d..f4d62ada 100644 --- a/plugins/ephemeral/item.go +++ b/plugins/memory/memoryjobs/item.go @@ -1,4 +1,4 @@ -package ephemeral +package memoryjobs import ( "context" @@ -124,6 +124,7 @@ func fromJob(job *job.Job) *Item { Job: job.Job, Ident: job.Ident, Payload: job.Payload, + Headers: job.Headers, Options: &Options{ Priority: job.Options.Priority, Pipeline: job.Options.Pipeline, diff --git a/plugins/memory/config.go b/plugins/memory/memorykv/config.go index e51d09c5..a8a8993f 100644 --- a/plugins/memory/config.go +++ b/plugins/memory/memorykv/config.go @@ -1,4 +1,4 @@ -package memory +package memorykv // Config is default config for the in-memory driver type Config struct { diff --git a/plugins/memory/kv.go b/plugins/memory/memorykv/kv.go index 68ea7266..9b3e176c 100644 --- a/plugins/memory/kv.go +++ b/plugins/memory/memorykv/kv.go @@ -1,4 +1,4 @@ -package memory +package memorykv import ( "strings" @@ -20,11 +20,11 @@ type Driver struct { cfg *Config } -func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { +func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configurer) (*Driver, error) { const op = errors.Op("new_in_memory_driver") d := &Driver{ - stop: stop, + stop: make(chan struct{}), log: log, } @@ -40,7 +40,7 @@ func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configure return d, nil } -func (s *Driver) Has(keys ...string) (map[string]bool, error) { +func (d *Driver) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("in_memory_plugin_has") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -52,7 +52,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) { return nil, errors.E(op, errors.EmptyKey) } - if _, ok := s.heap.Load(keys[i]); ok { + if _, ok := d.heap.Load(keys[i]); ok { m[keys[i]] = true } } @@ -60,7 +60,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) { return m, nil } -func (s *Driver) Get(key string) ([]byte, error) { +func (d *Driver) Get(key string) ([]byte, error) { const op = errors.Op("in_memory_plugin_get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) @@ -68,7 +68,7 @@ func (s *Driver) Get(key string) ([]byte, error) { return nil, errors.E(op, errors.EmptyKey) } - if data, exist := s.heap.Load(key); exist { + if data, exist := d.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function return data.(*kvv1.Item).Value, nil @@ -76,7 +76,7 @@ func (s *Driver) Get(key string) ([]byte, error) { return nil, nil } -func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("in_memory_plugin_mget") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -93,7 +93,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { m := make(map[string][]byte, len(keys)) for i := range keys { - if value, ok := s.heap.Load(keys[i]); ok { + if value, ok := d.heap.Load(keys[i]); ok { m[keys[i]] = value.(*kvv1.Item).Value } } @@ -101,7 +101,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { return m, nil } -func (s *Driver) Set(items ...*kvv1.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -120,14 +120,14 @@ func (s *Driver) Set(items ...*kvv1.Item) error { } } - s.heap.Store(items[i].Key, items[i]) + d.heap.Store(items[i].Key, items[i]) } return nil } // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...*kvv1.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_mexpire") for i := range items { if items[i] == nil { @@ -138,7 +138,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error { } // if key exist, overwrite it value - if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok { + if pItem, ok := d.heap.LoadAndDelete(items[i].Key); ok { // check that time is correct _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { @@ -148,7 +148,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error { // guess that t is in the future // in memory is just FOR TESTING PURPOSES // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, &kvv1.Item{ + d.heap.Store(items[i].Key, &kvv1.Item{ Key: items[i].Key, Value: tmp.Value, Timeout: items[i].Timeout, @@ -159,7 +159,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error { return nil } -func (s *Driver) TTL(keys ...string) (map[string]string, error) { +func (d *Driver) TTL(keys ...string) (map[string]string, error) { const op = errors.Op("in_memory_plugin_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -176,14 +176,14 @@ func (s *Driver) TTL(keys ...string) (map[string]string, error) { m := make(map[string]string, len(keys)) for i := range keys { - if item, ok := s.heap.Load(keys[i]); ok { + if item, ok := d.heap.Load(keys[i]); ok { m[keys[i]] = item.(*kvv1.Item).Timeout } } return m, nil } -func (s *Driver) Delete(keys ...string) error { +func (d *Driver) Delete(keys ...string) error { const op = errors.Op("in_memory_plugin_delete") if keys == nil { return errors.E(op, errors.NoKeys) @@ -198,34 +198,38 @@ func (s *Driver) Delete(keys ...string) error { } for i := range keys { - s.heap.Delete(keys[i]) + d.heap.Delete(keys[i]) } return nil } -func (s *Driver) Clear() error { - s.clearMu.Lock() - s.heap = sync.Map{} - s.clearMu.Unlock() +func (d *Driver) Clear() error { + d.clearMu.Lock() + d.heap = sync.Map{} + d.clearMu.Unlock() return nil } +func (d *Driver) Stop() { + d.stop <- struct{}{} +} + // ================================== PRIVATE ====================================== -func (s *Driver) gc() { - ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second) +func (d *Driver) gc() { + ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second) + defer ticker.Stop() for { select { - case <-s.stop: - ticker.Stop() + case <-d.stop: return case now := <-ticker.C: // mutes needed to clear the map - s.clearMu.RLock() + d.clearMu.RLock() // check every second - s.heap.Range(func(key, value interface{}) bool { + d.heap.Range(func(key, value interface{}) bool { v := value.(*kvv1.Item) if v.Timeout == "" { return true @@ -237,13 +241,13 @@ func (s *Driver) gc() { } if now.After(t) { - s.log.Debug("key deleted", "key", key) - s.heap.Delete(key) + d.log.Debug("key deleted", "key", key) + d.heap.Delete(key) } return true }) - s.clearMu.RUnlock() + d.clearMu.RUnlock() } } } diff --git a/plugins/memory/pubsub.go b/plugins/memory/memorypubsub/pubsub.go index fd30eb54..75122571 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/memorypubsub/pubsub.go @@ -1,4 +1,4 @@ -package memory +package memorypubsub import ( "context" diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 7d418a70..515e469a 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,27 +2,29 @@ package memory import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/common/kv" "github.com/spiral/roadrunner/v2/common/pubsub" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs" + "github.com/spiral/roadrunner/v2/plugins/memory/memorykv" + "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub" ) const PluginName string = "memory" type Plugin struct { - // heap is user map for the key-value pairs - stop chan struct{} - - log logger.Logger - cfgPlugin config.Configurer - drivers uint + log logger.Logger + cfg config.Configurer } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log - p.cfgPlugin = cfg - p.stop = make(chan struct{}, 1) + p.cfg = cfg return nil } @@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error { } func (p *Plugin) Stop() error { - if p.drivers > 0 { - for i := uint(0); i < p.drivers; i++ { - // send close signal to every driver - p.stop <- struct{}{} - } - } return nil } +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Available() {} + +// Drivers implementation + func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { - return NewPubSubDriver(p.log, key) + return memorypubsub.NewPubSubDriver(p.log, key) } func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("inmemory_plugin_provide") - st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop) + st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg) if err != nil { return nil, errors.E(op, err) } - - // save driver number to release resources after Stop - p.drivers++ - return st, nil } -func (p *Plugin) Name() string { - return PluginName +// JobsConstruct creates new ephemeral consumer from the configuration +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline creates new ephemeral consumer from the provided pipeline +func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return memoryjobs.FromPipeline(pipeline, p.log, e, pq) } diff --git a/plugins/redis/clients.go b/plugins/redis/clients.go deleted file mode 100644 index d0a184d2..00000000 --- a/plugins/redis/clients.go +++ /dev/null @@ -1,84 +0,0 @@ -package redis - -import ( - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" -) - -// RedisClient return a client based on the provided section key -// key sample: kv.some-section.redis -// kv.redis -// redis (root) -func (p *Plugin) RedisClient(key string) (redis.UniversalClient, error) { - const op = errors.Op("redis_get_client") - - if !p.cfgPlugin.Has(key) { - return nil, errors.E(op, errors.Errorf("no such section: %s", key)) - } - - cfg := &Config{} - - err := p.cfgPlugin.UnmarshalKey(key, cfg) - if err != nil { - return nil, errors.E(op, err) - } - - cfg.InitDefaults() - - uc := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: cfg.Addrs, - DB: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - SentinelPassword: cfg.SentinelPassword, - MaxRetries: cfg.MaxRetries, - MinRetryBackoff: cfg.MaxRetryBackoff, - MaxRetryBackoff: cfg.MaxRetryBackoff, - DialTimeout: cfg.DialTimeout, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - PoolSize: cfg.PoolSize, - MinIdleConns: cfg.MinIdleConns, - MaxConnAge: cfg.MaxConnAge, - PoolTimeout: cfg.PoolTimeout, - IdleTimeout: cfg.IdleTimeout, - IdleCheckFrequency: cfg.IdleCheckFreq, - ReadOnly: cfg.ReadOnly, - RouteByLatency: cfg.RouteByLatency, - RouteRandomly: cfg.RouteRandomly, - MasterName: cfg.MasterName, - }) - - return uc, nil -} - -func (p *Plugin) DefaultClient() redis.UniversalClient { - cfg := &Config{} - cfg.InitDefaults() - - uc := redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: cfg.Addrs, - DB: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - SentinelPassword: cfg.SentinelPassword, - MaxRetries: cfg.MaxRetries, - MinRetryBackoff: cfg.MaxRetryBackoff, - MaxRetryBackoff: cfg.MaxRetryBackoff, - DialTimeout: cfg.DialTimeout, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - PoolSize: cfg.PoolSize, - MinIdleConns: cfg.MinIdleConns, - MaxConnAge: cfg.MaxConnAge, - PoolTimeout: cfg.PoolTimeout, - IdleTimeout: cfg.IdleTimeout, - IdleCheckFrequency: cfg.IdleCheckFreq, - ReadOnly: cfg.ReadOnly, - RouteByLatency: cfg.RouteByLatency, - RouteRandomly: cfg.RouteRandomly, - MasterName: cfg.MasterName, - }) - - return uc -} diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go deleted file mode 100644 index 189b0002..00000000 --- a/plugins/redis/interface.go +++ /dev/null @@ -1,12 +0,0 @@ -package redis - -import "github.com/go-redis/redis/v8" - -// Redis in the redis KV plugin interface -type Redis interface { - // RedisClient provides universal redis client - RedisClient(key string) (redis.UniversalClient, error) - - // DefaultClient provide default redis client based on redis defaults - DefaultClient() redis.UniversalClient -} diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go index b41cb86c..3d062fbb 100644 --- a/plugins/redis/kv/kv.go +++ b/plugins/redis/kv/kv.go @@ -248,3 +248,5 @@ func (d *Driver) Clear() error { return nil } + +func (d *Driver) Stop() {} diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go index dfbda154..92dbd6a8 100644 --- a/plugins/sqs/consumer.go +++ b/plugins/sqs/consumer.go @@ -298,6 +298,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("sqs_run") c.Lock() @@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Stop(context.Context) error { + start := time.Now() if atomic.LoadUint32(&c.listeners) > 0 { c.pauseCh <- struct{}{} } @@ -333,12 +336,14 @@ func (c *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -362,11 +367,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -391,7 +398,8 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml index a4f31290..71b51dce 100644 --- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml @@ -7,9 +7,7 @@ server: relay_timeout: "20s" beanstalk: - # beanstalk address addr: tcp://127.0.0.1:11300 - # connect timeout timeout: 10s logs: diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index bf9f60cc..9813344e 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -45,17 +45,17 @@ jobs: # list of broker pipelines associated with endpoints pipelines: test-local: - driver: ephemeral + driver: memory priority: 10 prefetch: 10000 test-local-2: - driver: ephemeral + driver: memory priority: 1 prefetch: 10000 test-local-3: - driver: ephemeral + driver: memory priority: 2 prefetch: 10000 diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index 951d6227..5c521c2b 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -14,9 +14,9 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" @@ -68,7 +68,7 @@ func TestJobsInit(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, &amqp.Plugin{}, ) assert.NoError(t, err) @@ -154,7 +154,7 @@ func TestJOBSMetrics(t *testing.T) { &server.Plugin{}, &jobs.Plugin{}, &metrics.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, mockLogger, ) assert.NoError(t, err) @@ -204,8 +204,8 @@ func TestJOBSMetrics(t *testing.T) { time.Sleep(time.Second * 2) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("DeclareEphemeralPipeline", declareMemoryPipe) + t.Run("ConsumeEphemeralPipeline", consumeMemoryPipe) t.Run("PushEphemeralPipeline", pushToPipe("test-3")) time.Sleep(time.Second) t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_memory_test.go index 2890aa9d..20cbfb3f 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_memory_test.go @@ -15,9 +15,9 @@ import ( goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -26,12 +26,12 @@ import ( "github.com/stretchr/testify/assert" ) -func TestEphemeralInit(t *testing.T) { +func TestMemoryInit(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-init.yaml", + Path: "memory/.rr-memory-init.yaml", Prefix: "rr", } @@ -58,7 +58,7 @@ func TestEphemeralInit(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -112,12 +112,12 @@ func TestEphemeralInit(t *testing.T) { wg.Wait() } -func TestEphemeralDeclare(t *testing.T) { +func TestMemoryDeclare(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-declare.yaml", + Path: "memory/.rr-memory-declare.yaml", Prefix: "rr", } @@ -135,7 +135,7 @@ func TestEphemeralDeclare(t *testing.T) { mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) err = cont.RegisterAll( @@ -146,7 +146,7 @@ func TestEphemeralDeclare(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -197,25 +197,25 @@ func TestEphemeralDeclare(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) stopCh <- struct{}{} wg.Wait() } -func TestEphemeralPauseResume(t *testing.T) { +func TestMemoryPauseResume(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-pause-resume.yaml", + Path: "memory/.rr-memory-pause-resume.yaml", Prefix: "rr", } @@ -231,7 +231,7 @@ func TestEphemeralPauseResume(t *testing.T) { mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) @@ -249,7 +249,7 @@ func TestEphemeralPauseResume(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -301,10 +301,10 @@ func TestEphemeralPauseResume(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("ephemeralResume", resumePipes("test-local")) - t.Run("ephemeralPause", pausePipelines("test-local")) + t.Run("Resume", resumePipes("test-local")) + t.Run("Pause", pausePipelines("test-local")) t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local")) - t.Run("ephemeralResume", resumePipes("test-local")) + t.Run("Resume", resumePipes("test-local")) t.Run("pushToEnabledPipe", pushToPipe("test-local")) time.Sleep(time.Second * 1) @@ -313,12 +313,12 @@ func TestEphemeralPauseResume(t *testing.T) { wg.Wait() } -func TestEphemeralJobsError(t *testing.T) { +func TestMemoryJobsError(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-jobs-err.yaml", + Path: "memory/.rr-memory-jobs-err.yaml", Prefix: "rr", } @@ -336,7 +336,7 @@ func TestEphemeralJobsError(t *testing.T) { mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) @@ -348,7 +348,7 @@ func TestEphemeralJobsError(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -399,25 +399,25 @@ func TestEphemeralJobsError(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", resumePipes("test-3")) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second * 25) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) stopCh <- struct{}{} wg.Wait() } -func TestEphemeralStats(t *testing.T) { +func TestMemoryStats(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) cfg := &config.Viper{ - Path: "ephemeral/.rr-ephemeral-declare.yaml", + Path: "memory/.rr-memory-declare.yaml", Prefix: "rr", } @@ -435,7 +435,7 @@ func TestEphemeralStats(t *testing.T) { mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) - mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) err = cont.RegisterAll( @@ -446,7 +446,7 @@ func TestEphemeralStats(t *testing.T) { &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, - &ephemeral.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -497,22 +497,22 @@ func TestEphemeralStats(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareEphemeralPipeline", declareEphemeralPipe) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareMemoryPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) - t.Run("PauseEphemeralPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5)) - t.Run("PushEphemeralPipeline", pushToPipe("test-3")) + t.Run("PushPipeline", pushToPipeDelayed("test-3", 5)) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) out := &jobState.State{} t.Run("Stats", stats(out)) assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Driver, "memory") assert.Equal(t, out.Queue, "test-3") assert.Equal(t, out.Active, int64(1)) @@ -520,14 +520,14 @@ func TestEphemeralStats(t *testing.T) { assert.Equal(t, out.Reserved, int64(0)) time.Sleep(time.Second) - t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe) + t.Run("ConsumePipeline", consumeMemoryPipe) time.Sleep(time.Second * 7) out = &jobState.State{} t.Run("Stats", stats(out)) assert.Equal(t, out.Pipeline, "test-3") - assert.Equal(t, out.Driver, "ephemeral") + assert.Equal(t, out.Driver, "memory") assert.Equal(t, out.Queue, "test-3") assert.Equal(t, out.Active, int64(0)) @@ -541,13 +541,13 @@ func TestEphemeralStats(t *testing.T) { wg.Wait() } -func declareEphemeralPipe(t *testing.T) { +func declareMemoryPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "ephemeral", + "driver": "memory", "name": "test-3", "prefetch": "10000", }} @@ -557,7 +557,7 @@ func declareEphemeralPipe(t *testing.T) { assert.NoError(t, err) } -func consumeEphemeralPipe(t *testing.T) { +func consumeMemoryPipe(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:6001") assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml b/tests/plugins/jobs/memory/.rr-memory-declare.yaml index 726c24ac..726c24ac 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-declare.yaml diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml b/tests/plugins/jobs/memory/.rr-memory-init.yaml index 8914dfaa..9ee8afc2 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-init.yaml @@ -22,12 +22,12 @@ jobs: pipelines: test-1: - driver: ephemeral + driver: memory priority: 10 prefetch: 10000 test-2: - driver: ephemeral + driver: memory priority: 10 prefetch: 10000 diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml index 05dc3ffa..05dc3ffa 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml index e1b76263..1ad48237 100644 --- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml +++ b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml @@ -25,17 +25,17 @@ jobs: # list of broker pipelines associated with endpoints pipelines: test-local: - driver: ephemeral + driver: memory priority: 10 pipeline_size: 10000 test-local-2: - driver: ephemeral + driver: memory priority: 1 pipeline_size: 10000 test-local-3: - driver: ephemeral + driver: memory priority: 2 pipeline_size: 10000 diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml index f58de3e4..471e5c77 100644 --- a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml +++ b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: boltdb-south: diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml index 08b3bfad..b46bcb1c 100644 --- a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml +++ b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: boltdb-south: diff --git a/tests/plugins/kv/configs/.rr-kv-init.yaml b/tests/plugins/kv/configs/.rr-kv-init.yaml index a13b591c..6407c7ad 100644 --- a/tests/plugins/kv/configs/.rr-kv-init.yaml +++ b/tests/plugins/kv/configs/.rr-kv-init.yaml @@ -1,6 +1,9 @@ rpc: listen: tcp://127.0.0.1:6001 +logs: + mode: development + level: error kv: default: @@ -25,6 +28,3 @@ kv: memcached: driver: memcached addr: [ "127.0.0.1:11211" ] - -# redis: -# driver: redis diff --git a/tests/plugins/redis/plugin1.go b/tests/plugins/redis/plugin1.go deleted file mode 100644 index 68da1394..00000000 --- a/tests/plugins/redis/plugin1.go +++ /dev/null @@ -1,45 +0,0 @@ -package redis - -import ( - "context" - "time" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - redisPlugin "github.com/spiral/roadrunner/v2/plugins/redis" -) - -type Plugin1 struct { - redisClient redis.UniversalClient -} - -func (p *Plugin1) Init(redis redisPlugin.Redis) error { - var err error - p.redisClient, err = redis.RedisClient("redis") - - return err -} - -func (p *Plugin1) Serve() chan error { - const op = errors.Op("plugin1 serve") - errCh := make(chan error, 1) - p.redisClient.Set(context.Background(), "foo", "bar", time.Minute) - - stringCmd := p.redisClient.Get(context.Background(), "foo") - data, err := stringCmd.Result() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - if data != "bar" { - errCh <- errors.E(op, errors.Str("no such key")) - return errCh - } - - return errCh -} - -func (p *Plugin1) Stop() error { - return nil -} diff --git a/tests/plugins/redis/redis_plugin_test.go b/tests/plugins/redis/redis_plugin_test.go deleted file mode 100644 index 1b84e339..00000000 --- a/tests/plugins/redis/redis_plugin_test.go +++ /dev/null @@ -1,120 +0,0 @@ -package redis - -import ( - "fmt" - "os" - "os/signal" - "sync" - "syscall" - "testing" - - "github.com/alicebob/miniredis/v2" - "github.com/golang/mock/gomock" - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/redis" - "github.com/spiral/roadrunner/v2/tests/mocks" - "github.com/stretchr/testify/assert" -) - -func redisConfig(port string) string { - cfg := ` -redis: - addrs: - - '127.0.0.1:%s' - master_name: '' - username: '' - password: '' - db: 0 - sentinel_password: '' - route_by_latency: false - route_randomly: false - dial_timeout: 0 - max_retries: 1 - min_retry_backoff: 0 - max_retry_backoff: 0 - pool_size: 0 - min_idle_conns: 0 - max_conn_age: 0 - read_timeout: 0 - write_timeout: 0 - pool_timeout: 0 - idle_timeout: 0 - idle_check_freq: 0 - read_only: false -` - return fmt.Sprintf(cfg, port) -} - -func TestRedisInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - if err != nil { - t.Fatal(err) - } - - s, err := miniredis.Run() - assert.NoError(t, err) - - c := redisConfig(s.Port()) - - cfg := &config.Viper{} - cfg.Type = "yaml" - cfg.ReadInCfg = []byte(c) - - controller := gomock.NewController(t) - mockLogger := mocks.NewMockLogger(controller) - - err = cont.RegisterAll( - cfg, - mockLogger, - &redis.Plugin{}, - &Plugin1{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - wg := &sync.WaitGroup{} - wg.Add(1) - - stopCh := make(chan struct{}, 1) - - go func() { - defer wg.Done() - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-stopCh: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - stopCh <- struct{}{} - wg.Wait() -} diff --git a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml index 5ab359d3..d256aad7 100644 --- a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml +++ b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml @@ -1,3 +1,8 @@ logs: mode: development - level: error
\ No newline at end of file + level: panic + +endure: + grace_period: 120s + print_graph: false + log_level: panic diff --git a/tests/plugins/rpc/configs/.rr.yaml b/tests/plugins/rpc/configs/.rr.yaml index 67d935e3..d6aaa7c6 100644 --- a/tests/plugins/rpc/configs/.rr.yaml +++ b/tests/plugins/rpc/configs/.rr.yaml @@ -1,5 +1,11 @@ rpc: listen: tcp://127.0.0.1:6001 + logs: mode: development - level: error
\ No newline at end of file + level: panic + +endure: + grace_period: 120s + print_graph: false + log_level: panic |