summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-02 20:09:01 +0300
committerGitHub <[email protected]>2021-09-02 20:09:01 +0300
commit6749db4a2d39fa70b426bcf50edf66a176c07f57 (patch)
treef6f92c9f0016f6bcac6a9aa45ccc961eebf90018
parent0437d1f58514f694ea86e8176e621c009cd510f9 (diff)
parent4524f8c5af045ed5048250b63b7859eaeb4f24a1 (diff)
#778: feat(refactor): jobs code adjustingv2.4.0
#778: feat(refactor): jobs code adjusting
-rwxr-xr-x.golangci.yml1
-rw-r--r--CHANGELOG.md17
-rwxr-xr-xMakefile24
-rw-r--r--common/kv/interface.go3
-rw-r--r--go.mod9
-rw-r--r--go.sum20
-rw-r--r--plugins/amqp/amqpjobs/consumer.go22
-rw-r--r--plugins/amqp/amqpjobs/item.go7
-rw-r--r--plugins/amqp/amqpjobs/redial.go21
-rw-r--r--plugins/beanstalk/consumer.go28
-rw-r--r--plugins/beanstalk/item.go9
-rw-r--r--plugins/boltdb/boltjobs/consumer.go18
-rw-r--r--plugins/boltdb/boltjobs/listener.go5
-rw-r--r--plugins/boltdb/boltkv/driver.go8
-rw-r--r--plugins/boltdb/doc/job_lifecycle.md1
-rw-r--r--plugins/boltdb/plugin.go24
-rw-r--r--plugins/ephemeral/plugin.go41
-rw-r--r--plugins/jobs/job/job.go11
-rw-r--r--plugins/jobs/job/job_test.go27
-rw-r--r--plugins/jobs/plugin.go96
-rw-r--r--plugins/jobs/rpc.go8
-rw-r--r--plugins/kv/plugin.go4
-rw-r--r--plugins/memcached/memcachedkv/config.go (renamed from plugins/memcached/config.go)2
-rw-r--r--plugins/memcached/memcachedkv/driver.go (renamed from plugins/memcached/driver.go)4
-rw-r--r--plugins/memcached/plugin.go3
-rw-r--r--plugins/memory/memoryjobs/consumer.go (renamed from plugins/ephemeral/consumer.go)70
-rw-r--r--plugins/memory/memoryjobs/item.go (renamed from plugins/ephemeral/item.go)3
-rw-r--r--plugins/memory/memorykv/config.go (renamed from plugins/memory/config.go)2
-rw-r--r--plugins/memory/memorykv/kv.go (renamed from plugins/memory/kv.go)66
-rw-r--r--plugins/memory/memorypubsub/pubsub.go (renamed from plugins/memory/pubsub.go)2
-rw-r--r--plugins/memory/plugin.go50
-rw-r--r--plugins/redis/clients.go84
-rw-r--r--plugins/redis/interface.go12
-rw-r--r--plugins/redis/kv/kv.go2
-rw-r--r--plugins/sqs/consumer.go16
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml2
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml6
-rw-r--r--tests/plugins/jobs/jobs_general_test.go10
-rw-r--r--tests/plugins/jobs/jobs_memory_test.go (renamed from tests/plugins/jobs/jobs_ephemeral_test.go)90
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-declare.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml)0
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-init.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml)4
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml)0
-rw-r--r--tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml (renamed from tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml)6
-rw-r--r--tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml3
-rw-r--r--tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml3
-rw-r--r--tests/plugins/kv/configs/.rr-kv-init.yaml6
-rw-r--r--tests/plugins/redis/plugin1.go45
-rw-r--r--tests/plugins/redis/redis_plugin_test.go120
-rw-r--r--tests/plugins/rpc/configs/.rr-rpc-disabled.yaml7
-rw-r--r--tests/plugins/rpc/configs/.rr.yaml8
50 files changed, 384 insertions, 646 deletions
diff --git a/.golangci.yml b/.golangci.yml
index 55186659..f6ead63e 100755
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -74,7 +74,6 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- structcheck # Finds unused struct fields
- stylecheck # Stylecheck is a replacement for golint
- tparallel # detects inappropriate usage of t.Parallel() method in your Go test codes
- - typecheck # Like the front-end of a Go compiler, parses and type-checks Go code
- unconvert # Remove unnecessary type conversions
- unused # Checks Go code for unused constants, variables, functions and types
- varcheck # Finds unused global variables and constants
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fcec90cb..897877d3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,7 @@
CHANGELOG
=========
-v2.4.0 (_.08.2021)
+v2.4.0 (02.09.2021)
-------------------
## 💔 Internal BC:
@@ -10,20 +10,27 @@ v2.4.0 (_.08.2021)
## 👀 New:
-- ✏️ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime.
- Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `ephemeral` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726)
+- ✏️ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `memory` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726)
- ✏️ Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port`, `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2)
- ✏️ Support for the Docker images via GitHub packages.
-- ✏️ Go 1.17 support.
+- ✏️ Go 1.17 support for the all spiral packages.
## 🩹 Fixes:
- 🐛 Fix: fixed bug with goroutines waiting on the internal worker's container channel, [issue](https://github.com/spiral/roadrunner/issues/750).
- 🐛 Fix: RR become unresponsive when new workers failed to re-allocate, [issue](https://github.com/spiral/roadrunner/issues/772).
+- 🐛 Fix: add `debug` pool config key to the `.rr.yaml` configuration [reference](https://github.com/spiral/roadrunner-binary/issues/79).
+
+## 📦 Packages:
+
+- 📦 Update goridge to `v3.2.1`
+- 📦 Update temporal to `v1.0.9`
+- 📦 Update endure to `v1.0.4`
## 📈 Summary:
-- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29)
+- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29?closed=1)
+- RR-Binary Milestone [2.4.0](https://github.com/spiral/roadrunner-binary/milestone/10?closed=1)
---
diff --git a/Makefile b/Makefile
index 389c9014..8390e910 100755
--- a/Makefile
+++ b/Makefile
@@ -15,15 +15,15 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.txt -covermode=atomic ./plugins/websockets
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload
@@ -35,7 +35,6 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/headers.txt -covermode=atomic ./tests/plugins/headers
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/logger.txt -covermode=atomic ./tests/plugins/logger
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/metrics.txt -covermode=atomic ./tests/plugins/metrics
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/redis.txt -covermode=atomic ./tests/plugins/redis
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/resetter.txt -covermode=atomic ./tests/plugins/resetter
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./tests/plugins/rpc
cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt
@@ -50,10 +49,15 @@ test: ## Run application tests
go test -v -race -tags=debug ./pkg/worker_watcher
go test -v -race -tags=debug ./pkg/bst
go test -v -race -tags=debug ./pkg/priority_queue
- go test -v -race -tags=debug ./plugins/jobs/job
go test -v -race -tags=debug ./plugins/jobs/pipeline
go test -v -race -tags=debug ./plugins/http/config
go test -v -race -tags=debug ./plugins/server
+ go test -v -race -tags=debug ./plugins/jobs/job
+ go test -v -race -tags=debug ./tests/plugins/jobs
+ go test -v -race -tags=debug ./tests/plugins/kv
+ go test -v -race -tags=debug ./tests/plugins/broadcast
+ go test -v -race -tags=debug ./tests/plugins/websockets
+ go test -v -race -tags=debug ./plugins/websockets
go test -v -race -tags=debug ./tests/plugins/http
go test -v -race -tags=debug ./tests/plugins/informer
go test -v -race -tags=debug ./tests/plugins/reload
@@ -65,12 +69,6 @@ test: ## Run application tests
go test -v -race -tags=debug ./tests/plugins/headers
go test -v -race -tags=debug ./tests/plugins/logger
go test -v -race -tags=debug ./tests/plugins/metrics
- go test -v -race -tags=debug ./tests/plugins/redis
go test -v -race -tags=debug ./tests/plugins/resetter
go test -v -race -tags=debug ./tests/plugins/rpc
- go test -v -race -tags=debug ./tests/plugins/kv
- go test -v -race -tags=debug ./tests/plugins/broadcast
- go test -v -race -tags=debug ./tests/plugins/websockets
- go test -v -race -tags=debug ./plugins/websockets
- go test -v -race -tags=debug ./tests/plugins/jobs
docker-compose -f tests/env/docker-compose.yaml down
diff --git a/common/kv/interface.go b/common/kv/interface.go
index 5736a6a7..bc6a07b2 100644
--- a/common/kv/interface.go
+++ b/common/kv/interface.go
@@ -30,6 +30,9 @@ type Storage interface {
// Delete one or multiple keys.
Delete(keys ...string) error
+
+ // Stop the storage driver
+ Stop()
}
// Constructor provides storage based on the config
diff --git a/go.mod b/go.mod
index a8932d87..85421a96 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,6 @@ go 1.17
require (
github.com/Shopify/toxiproxy v2.1.4+incompatible
- github.com/alicebob/miniredis/v2 v2.15.1
// ========= AWS SDK v2
github.com/aws/aws-sdk-go-v2 v1.9.0
github.com/aws/aws-sdk-go-v2/config v1.7.0
@@ -25,10 +24,10 @@ require (
github.com/klauspost/compress v1.13.5
github.com/prometheus/client_golang v1.11.0
github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891
- github.com/shirou/gopsutil v3.21.7+incompatible
+ github.com/shirou/gopsutil v3.21.8+incompatible
github.com/spf13/viper v1.8.1
// SPIRAL ====
- github.com/spiral/endure v1.0.3
+ github.com/spiral/endure v1.0.4
github.com/spiral/errors v1.0.12
github.com/spiral/goridge/v3 v3.2.1
// ===========
@@ -40,14 +39,13 @@ require (
go.uber.org/zap v1.19.0
golang.org/x/net v0.0.0-20210825183410-e898025ed96a
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
- golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf
+ golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e
google.golang.org/protobuf v1.27.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
require (
github.com/StackExchange/wmi v1.2.1 // indirect
- github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/andybalholm/brotli v1.0.3 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2 // indirect
@@ -86,7 +84,6 @@ require (
github.com/valyala/fasthttp v1.29.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
- github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.5 // indirect
diff --git a/go.sum b/go.sum
index 776d4212..e144019b 100644
--- a/go.sum
+++ b/go.sum
@@ -49,10 +49,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
-github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
-github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
-github.com/alicebob/miniredis/v2 v2.15.1 h1:Fw+ixAJPmKhCLBqDwHlTDqxUxp0xjEwXczEpt1B6r7k=
-github.com/alicebob/miniredis/v2 v2.15.1/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I=
github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/andybalholm/brotli v1.0.2/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0UnM=
@@ -358,8 +354,8 @@ github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c/go.mod h1:TWNAOTaVz
github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873 h1:N3Af8f13ooDKcIhsmFT7Z05CStZWu4C7Md0uDEy4q6o=
github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873/go.mod h1:dmPawKuiAeG/aFYVs2i+Dyosoo7FNcm+Pi8iK6ZUrX8=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
-github.com/shirou/gopsutil v3.21.7+incompatible h1:g/wcPHcuCQvHSePVofjQljd2vX4ty0+J6VoMB+NPcdk=
-github.com/shirou/gopsutil v3.21.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
+github.com/shirou/gopsutil v3.21.8+incompatible h1:sh0foI8tMRlCidUJR+KzqWYWxrkuuPIGiO6Vp+KXdCU=
+github.com/shirou/gopsutil v3.21.8+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
@@ -379,8 +375,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
-github.com/spiral/endure v1.0.3 h1:07E4MkwHMOJyjotHlTq56SsEK0aCsVslkzR106aj9hk=
-github.com/spiral/endure v1.0.3/go.mod h1:b2hAQBpsyuDL3LDg2dLTs2htYhlY+hLwBgGE075B6yU=
+github.com/spiral/endure v1.0.4 h1:qpProWUVuu6fRceMnIHs9SkpkjlzAxPl7UxSH6zUPDo=
+github.com/spiral/endure v1.0.4/go.mod h1:I9IoSCMtqXVmXX0TQ3Gu72Z1uIDVNKlhKXmcCoqnR/w=
github.com/spiral/errors v1.0.12 h1:38Waf8ZL/Xvxg4HTYGmrUbvi7TCHivmuatNQZlBhQ8s=
github.com/spiral/errors v1.0.12/go.mod h1:j5UReqxZxfkwXkI9mFY87VhEXcXmSg7kAk5Sswy1eEA=
github.com/spiral/goridge/v3 v3.2.1 h1:5IJofcvWYjAy+X5XevOhwf/8F0i0Bu/baPsBGiSgqzU=
@@ -419,9 +415,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
-github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
-github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
-github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
@@ -567,7 +560,6 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -622,8 +614,8 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k=
-golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e h1:XMgFehsDnnLGtjvjOfqWSUzt0alpTR1RSEuznObga2c=
+golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 784a102c..2ff0a40a 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -242,6 +242,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ start := time.Now()
const op = errors.Op("rabbit_run")
pipe := c.pipeline.Load().(*pipeline.Pipeline)
@@ -287,7 +288,8 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
@@ -323,6 +325,7 @@ func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -356,11 +359,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -415,22 +420,25 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Stop(context.Context) error {
- if atomic.LoadUint32(&c.listeners) > 0 {
- c.stopCh <- struct{}{}
- }
+ start := time.Now()
+ c.stopCh <- struct{}{}
pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
+
return nil
}
diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go
index 04385afe..b837ff86 100644
--- a/plugins/amqp/amqpjobs/item.go
+++ b/plugins/amqp/amqpjobs/item.go
@@ -43,17 +43,18 @@ type Options struct {
Delay int64 `json:"delay,omitempty"`
// private
- // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
+ // ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
ack func(multiply bool) error
- // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
+ // nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
// When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
// When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
// This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
nack func(multiply bool, requeue bool) error
// requeueFn used as a pointer to the push function
- requeueFn func(context.Context, *Item) error
+ requeueFn func(context.Context, *Item) error
+ // delayed jobs TODO(rustatian): figure out how to get stats from the DLX
delayed *int64
multipleAsk bool
requeue bool
diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go
index 8d21784f..698a34a6 100644
--- a/plugins/amqp/amqpjobs/redial.go
+++ b/plugins/amqp/amqpjobs/redial.go
@@ -113,25 +113,22 @@ func (c *consumer) redialer() { //nolint:gocognit
c.Unlock()
case <-c.stopCh:
- if c.publishChan != nil {
- pch := <-c.publishChan
- err := pch.Close()
- if err != nil {
- c.log.Error("publish channel close", "error", err)
- }
+ pch := <-c.publishChan
+ err := pch.Close()
+ if err != nil {
+ c.log.Error("publish channel close", "error", err)
}
if c.consumeChan != nil {
- err := c.consumeChan.Close()
+ err = c.consumeChan.Close()
if err != nil {
c.log.Error("consume channel close", "error", err)
}
}
- if c.conn != nil {
- err := c.conn.Close()
- if err != nil {
- c.log.Error("amqp connection close", "error", err)
- }
+
+ err = c.conn.Close()
+ if err != nil {
+ c.log.Error("amqp connection close", "error", err)
}
return
diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
index 5ef89983..30807f03 100644
--- a/plugins/beanstalk/consumer.go
+++ b/plugins/beanstalk/consumer.go
@@ -3,6 +3,7 @@ package beanstalk
import (
"bytes"
"context"
+ "encoding/gob"
"strconv"
"strings"
"sync/atomic"
@@ -183,11 +184,16 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error {
bb := new(bytes.Buffer)
bb.Grow(64)
- err := item.pack(bb)
+ err := gob.NewEncoder(bb).Encode(item)
if err != nil {
return errors.E(op, err)
}
+ body := make([]byte, bb.Len())
+ copy(body, bb.Bytes())
+ bb.Reset()
+ bb = nil
+
// https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458
// <pri> is an integer < 2**32. Jobs with smaller priority values will be
// scheduled before jobs with larger priorities. The most urgent priority is 0;
@@ -203,7 +209,7 @@ func (j *consumer) handleItem(ctx context.Context, item *Item) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout)
+ id, err := j.pool.Put(ctx, body, *j.tubePriority, item.Options.DelayDuration(), j.tout)
if err != nil {
errD := j.pool.Delete(ctx, id)
if errD != nil {
@@ -260,9 +266,10 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
- // check if the pipeline registered
+ start := time.Now()
// load atomic value
+ // check if the pipeline registered
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name()))
@@ -276,13 +283,15 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (j *consumer) Stop(context.Context) error {
+ start := time.Now()
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if atomic.LoadUint32(&j.listeners) == 1 {
@@ -293,13 +302,15 @@ func (j *consumer) Stop(context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (j *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -322,11 +333,13 @@ func (j *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (j *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -351,7 +364,8 @@ func (j *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
diff --git a/plugins/beanstalk/item.go b/plugins/beanstalk/item.go
index 0a6cd560..03060994 100644
--- a/plugins/beanstalk/item.go
+++ b/plugins/beanstalk/item.go
@@ -125,15 +125,6 @@ func fromJob(job *job.Job) *Item {
}
}
-func (i *Item) pack(b *bytes.Buffer) error {
- err := gob.NewEncoder(b).Encode(i)
- if err != nil {
- return err
- }
-
- return nil
-}
-
func (j *consumer) unpack(id uint64, data []byte, out *Item) error {
err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
if err != nil {
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index ed0eda61..62045d3b 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return &consumer{
file: pipeline.String(file, rrDB),
priority: pipeline.Int(priority, 10),
- prefetch: pipeline.Int(prefetch, 100),
+ prefetch: pipeline.Int(prefetch, 1000),
permissions: conf.Permissions,
bPool: sync.Pool{New: func() interface{} {
@@ -300,6 +300,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("boltdb_run")
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
@@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Stop(_ context.Context) error {
+ start := time.Now()
if atomic.LoadUint32(&c.listeners) > 0 {
c.stopCh <- struct{}{}
c.stopCh <- struct{}{}
@@ -334,12 +337,14 @@ func (c *consumer) Stop(_ context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -361,11 +366,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -389,7 +396,8 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 7c161555..081d3f57 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -3,6 +3,7 @@ package boltjobs
import (
"bytes"
"encoding/gob"
+ "sync/atomic"
"time"
"github.com/spiral/roadrunner/v2/utils"
@@ -18,6 +19,10 @@ func (c *consumer) listener() {
c.log.Info("boltdb listener stopped")
return
case <-tt.C:
+ if atomic.LoadUint64(c.active) > uint64(c.prefetch) {
+ time.Sleep(time.Second)
+ continue
+ }
tx, err := c.db.Begin(true)
if err != nil {
c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
diff --git a/plugins/boltdb/boltkv/driver.go b/plugins/boltdb/boltkv/driver.go
index ba1450cd..656d572e 100644
--- a/plugins/boltdb/boltkv/driver.go
+++ b/plugins/boltdb/boltkv/driver.go
@@ -38,7 +38,7 @@ type Driver struct {
stop chan struct{}
}
-func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
+func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) {
const op = errors.Op("new_boltdb_driver")
if !cfgPlugin.Has(RootPluginName) {
@@ -47,7 +47,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
d := &Driver{
log: log,
- stop: stop,
+ stop: make(chan struct{}),
}
err := cfgPlugin.UnmarshalKey(key, &d.cfg)
@@ -411,6 +411,10 @@ func (d *Driver) Clear() error {
return nil
}
+func (d *Driver) Stop() {
+ d.stop <- struct{}{}
+}
+
// ========================= PRIVATE =================================
func (d *Driver) startGCLoop() { //nolint:gocognit
diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md
index 317aec90..1424e586 100644
--- a/plugins/boltdb/doc/job_lifecycle.md
+++ b/plugins/boltdb/doc/job_lifecycle.md
@@ -7,4 +7,3 @@ There are several boltdb buckets:
get into the `InQueueBucket` waiting to acknowledgement.
3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration.
-``
diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go
index 683b26f1..ad98cf3c 100644
--- a/plugins/boltdb/plugin.go
+++ b/plugins/boltdb/plugin.go
@@ -19,19 +19,14 @@ const (
// Plugin BoltDB K/V storage.
type Plugin struct {
- cfgPlugin config.Configurer
+ cfg config.Configurer
// logger
log logger.Logger
- // stop is used to stop keys GC and close boltdb connection
- stop chan struct{}
-
- drivers uint
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.stop = make(chan struct{})
p.log = log
- p.cfgPlugin = cfg
+ p.cfg = cfg
return nil
}
@@ -41,12 +36,6 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
- if p.drivers > 0 {
- for i := uint(0); i < p.drivers; i++ {
- // send close signal to every driver
- p.stop <- struct{}{}
- }
- }
return nil
}
@@ -60,23 +49,20 @@ func (p *Plugin) Available() {}
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
- st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop)
+ st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfg)
if err != nil {
return nil, errors.E(op, err)
}
- // save driver number to release resources after Stop
- p.drivers++
-
return st, nil
}
// JOBS bbolt implementation
func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
- return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue)
+ return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfg, e, queue)
}
func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) {
- return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue)
+ return boltjobs.FromPipeline(pipe, p.log, p.cfg, e, queue)
}
diff --git a/plugins/ephemeral/plugin.go b/plugins/ephemeral/plugin.go
deleted file mode 100644
index 28495abb..00000000
--- a/plugins/ephemeral/plugin.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "ephemeral"
-)
-
-type Plugin struct {
- log logger.Logger
- cfg config.Configurer
-}
-
-func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- p.log = log
- p.cfg = cfg
- return nil
-}
-
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func (p *Plugin) Available() {}
-
-// JobsConstruct creates new ephemeral consumer from the configuration
-func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewJobBroker(configKey, p.log, p.cfg, e, pq)
-}
-
-// FromPipeline creates new ephemeral consumer from the provided pipeline
-func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return FromPipeline(pipeline, p.log, e, pq)
-}
diff --git a/plugins/jobs/job/job.go b/plugins/jobs/job/job.go
index 06c3254e..adab2a0a 100644
--- a/plugins/jobs/job/job.go
+++ b/plugins/jobs/job/job.go
@@ -45,17 +45,6 @@ type Options struct {
Delay int64 `json:"delay,omitempty"`
}
-// Merge merges job options.
-func (o *Options) Merge(from *Options) {
- if o.Pipeline == "" {
- o.Pipeline = from.Pipeline
- }
-
- if o.Delay == 0 {
- o.Delay = from.Delay
- }
-}
-
// DelayDuration returns delay duration in a form of time.Duration.
func (o *Options) DelayDuration() time.Duration {
return time.Second * time.Duration(o.Delay)
diff --git a/plugins/jobs/job/job_test.go b/plugins/jobs/job/job_test.go
index a47151a3..4a95e27d 100644
--- a/plugins/jobs/job/job_test.go
+++ b/plugins/jobs/job/job_test.go
@@ -16,30 +16,3 @@ func TestOptions_DelayDuration2(t *testing.T) {
opts := &Options{Delay: 1}
assert.Equal(t, time.Second, opts.DelayDuration())
}
-
-func TestOptions_Merge(t *testing.T) {
- opts := &Options{}
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- })
-
- assert.Equal(t, "pipeline", opts.Pipeline)
- assert.Equal(t, int64(2), opts.Delay)
-}
-
-func TestOptions_MergeKeepOriginal(t *testing.T) {
- opts := &Options{
- Pipeline: "default",
- Delay: 10,
- }
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- })
-
- assert.Equal(t, "default", opts.Pipeline)
- assert.Equal(t, int64(10), opts.Delay)
-}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 3f3fa196..3aec6acc 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -41,7 +41,7 @@ type Plugin struct {
server server.Server
jobConstructors map[string]jobs.Constructor
- consumers map[string]jobs.Consumer
+ consumers sync.Map // map[string]jobs.Consumer
// events handler
events events.Handler
@@ -82,7 +82,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.events.AddListener(p.collectJobsEvents)
p.jobConstructors = make(map[string]jobs.Constructor)
- p.consumers = make(map[string]jobs.Consumer)
p.consume = make(map[string]struct{})
p.stopCh = make(chan struct{}, 1)
@@ -130,19 +129,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// jobConstructors contains constructors for the drivers
// we need here to initialize these drivers for the pipelines
- if c, ok := p.jobConstructors[dr]; ok {
+ if _, ok := p.jobConstructors[dr]; ok {
// config key for the particular sub-driver jobs.pipelines.test-local
configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name)
// init the driver
- initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue)
+ initializedDriver, err := p.jobConstructors[dr].JobsConstruct(configKey, p.events, p.queue)
if err != nil {
errCh <- errors.E(op, err)
return false
}
// add driver to the set of the consumers (name - pipeline name, value - associated driver)
- p.consumers[name] = initializedDriver
+ p.consumers.Store(name, initializedDriver)
// register pipeline for the initialized driver
err = initializedDriver.Register(context.Background(), pipe)
@@ -331,16 +330,19 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
func (p *Plugin) Stop() error {
- for k, v := range p.consumers {
+ // range over all consumers and call stop
+ p.consumers.Range(func(key, value interface{}) bool {
+ consumer := value.(jobs.Consumer)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := v.Stop(ctx)
+ err := consumer.Stop(ctx)
if err != nil {
cancel()
- p.log.Error("stop job driver", "driver", k)
- continue
+ p.log.Error("stop job driver", "driver", key)
+ return true
}
cancel()
- }
+ return true
+ })
// this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
// but if not, this is not a problem at all.
@@ -394,18 +396,26 @@ func (p *Plugin) Workers() []*process.State {
func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) {
const op = errors.Op("jobs_plugin_drivers_state")
- jst := make([]*jobState.State, 0, len(p.consumers))
- for k := range p.consumers {
- d := p.consumers[k]
+ jst := make([]*jobState.State, 0, 2)
+ var err error
+ p.consumers.Range(func(key, value interface{}) bool {
+ consumer := value.(jobs.Consumer)
newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout))
- state, err := d.State(newCtx)
+
+ var state *jobState.State
+ state, err = consumer.State(newCtx)
if err != nil {
cancel()
- return nil, errors.E(op, err)
+ return false
}
jst = append(jst, state)
cancel()
+ return true
+ })
+
+ if err != nil {
+ return nil, errors.E(op, err)
}
return jst, nil
}
@@ -449,13 +459,12 @@ func (p *Plugin) Push(j *job.Job) error {
// type conversion
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
// if job has no priority, inherit it from the pipeline
- // TODO(rustatian) merge all options, not only priority
if j.Options.Priority == 0 {
j.Options.Priority = ppl.Priority()
}
@@ -463,16 +472,16 @@ func (p *Plugin) Push(j *job.Job) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
- err := d.Push(ctx, j)
+ err := d.(jobs.Consumer).Push(ctx, j)
if err != nil {
p.events.Push(events.JobEvent{
Event: events.EventPushError,
ID: j.Ident,
Pipeline: ppl.Name(),
Driver: ppl.Driver(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
- Error: err,
})
return errors.E(op, err)
}
@@ -482,9 +491,9 @@ func (p *Plugin) Push(j *job.Job) error {
ID: j.Ident,
Pipeline: ppl.Name(),
Driver: ppl.Driver(),
+ Error: err,
Start: start,
Elapsed: time.Since(start),
- Error: err,
})
return nil
@@ -492,9 +501,9 @@ func (p *Plugin) Push(j *job.Job) error {
func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
+ start := time.Now()
for i := 0; i < len(j); i++ {
- start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
if !ok {
@@ -503,7 +512,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
@@ -514,7 +523,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- err := d.Push(ctx, j[i])
+ err := d.(jobs.Consumer).Push(ctx, j[i])
if err != nil {
cancel()
p.events.Push(events.JobEvent{
@@ -544,7 +553,7 @@ func (p *Plugin) Pause(pp string) {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
p.log.Warn("driver for the pipeline not found", "pipeline", pp)
return
@@ -552,7 +561,7 @@ func (p *Plugin) Pause(pp string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
// redirect call to the underlying driver
- d.Pause(ctx, ppl.Name())
+ d.(jobs.Consumer).Pause(ctx, ppl.Name())
}
func (p *Plugin) Resume(pp string) {
@@ -563,7 +572,7 @@ func (p *Plugin) Resume(pp string) {
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ d, ok := p.consumers.Load(ppl.Name())
if !ok {
p.log.Warn("driver for the pipeline not found", "pipeline", pp)
return
@@ -572,7 +581,7 @@ func (p *Plugin) Resume(pp string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
defer cancel()
// redirect call to the underlying driver
- d.Resume(ctx, ppl.Name())
+ d.(jobs.Consumer).Resume(ctx, ppl.Name())
}
// Declare a pipeline.
@@ -586,16 +595,13 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
// jobConstructors contains constructors for the drivers
// we need here to initialize these drivers for the pipelines
- if c, ok := p.jobConstructors[dr]; ok {
+ if _, ok := p.jobConstructors[dr]; ok {
// init the driver from pipeline
- initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue)
+ initializedDriver, err := p.jobConstructors[dr].FromPipeline(pipeline, p.events, p.queue)
if err != nil {
return errors.E(op, err)
}
- // add driver to the set of the consumers (name - pipeline name, value - associated driver)
- p.consumers[pipeline.Name()] = initializedDriver
-
// register pipeline for the initialized driver
err = initializedDriver.Register(context.Background(), pipeline)
if err != nil {
@@ -612,10 +618,12 @@ func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
return errors.E(op, err)
}
}
- }
- // save the pipeline
- p.pipelines.Store(pipeline.Name(), pipeline)
+ // add driver to the set of the consumers (name - pipeline name, value - associated driver)
+ p.consumers.Store(pipeline.Name(), initializedDriver)
+ // save the pipeline
+ p.pipelines.Store(pipeline.Name(), pipeline)
+ }
return nil
}
@@ -631,18 +639,24 @@ func (p *Plugin) Destroy(pp string) error {
// type conversion
ppl := pipe.(*pipeline.Pipeline)
- d, ok := p.consumers[ppl.Name()]
+ // delete consumer
+ d, ok := p.consumers.LoadAndDelete(ppl.Name())
if !ok {
return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
}
- // delete consumer
- delete(p.consumers, ppl.Name())
- p.pipelines.Delete(pp)
+ // delete old pipeline
+ p.pipelines.LoadAndDelete(pp)
+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
- defer cancel()
+ err := d.(jobs.Consumer).Stop(ctx)
+ if err != nil {
+ cancel()
+ return errors.E(op, err)
+ }
- return d.Stop(ctx)
+ cancel()
+ return nil
}
func (p *Plugin) List() []string {
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 94f903d5..d7b93bd1 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -25,7 +25,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
return errors.E(op, errors.Str("empty ID field not allowed"))
}
- err := r.p.Push(r.from(j.GetJob()))
+ err := r.p.Push(from(j.GetJob()))
if err != nil {
return errors.E(op, err)
}
@@ -43,7 +43,7 @@ func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) err
for i := 0; i < l; i++ {
// convert transport entity into domain
// how we can do this quickly
- batch[i] = r.from(j.GetJobs()[i])
+ batch[i] = from(j.GetJobs()[i])
}
err := r.p.PushBatch(batch)
@@ -137,8 +137,8 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
}
// from converts from transport entity to domain
-func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
- headers := map[string][]string{}
+func from(j *jobsv1beta.Job) *job.Job {
+ headers := make(map[string][]string, len(j.GetHeaders()))
for k, v := range j.GetHeaders() {
headers[k] = v.GetValue()
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index c6ca96c3..a1144b85 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -104,6 +104,10 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
+ // stop all attached storages
+ for k := range p.storages {
+ p.storages[k].Stop()
+ }
return nil
}
diff --git a/plugins/memcached/config.go b/plugins/memcached/memcachedkv/config.go
index 6d413790..569e2573 100644
--- a/plugins/memcached/config.go
+++ b/plugins/memcached/memcachedkv/config.go
@@ -1,4 +1,4 @@
-package memcached
+package memcachedkv
type Config struct {
// Addr is url for memcached, 11211 port is used by default
diff --git a/plugins/memcached/driver.go b/plugins/memcached/memcachedkv/driver.go
index e24747fe..6d5e1802 100644
--- a/plugins/memcached/driver.go
+++ b/plugins/memcached/memcachedkv/driver.go
@@ -1,4 +1,4 @@
-package memcached
+package memcachedkv
import (
"strings"
@@ -246,3 +246,5 @@ func (d *Driver) Clear() error {
return nil
}
+
+func (d *Driver) Stop() {}
diff --git a/plugins/memcached/plugin.go b/plugins/memcached/plugin.go
index 59a2b7cb..47bca0e2 100644
--- a/plugins/memcached/plugin.go
+++ b/plugins/memcached/plugin.go
@@ -5,6 +5,7 @@ import (
"github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memcached/memcachedkv"
)
const (
@@ -39,7 +40,7 @@ func (s *Plugin) Available() {}
func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
- st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
+ st, err := memcachedkv.NewMemcachedDriver(s.log, key, s.cfgPlugin)
if err != nil {
return nil, errors.E(op, err)
}
diff --git a/plugins/ephemeral/consumer.go b/plugins/memory/memoryjobs/consumer.go
index 8870bb0f..fbdedefe 100644
--- a/plugins/ephemeral/consumer.go
+++ b/plugins/memory/memoryjobs/consumer.go
@@ -1,4 +1,4 @@
-package ephemeral
+package memoryjobs
import (
"context"
@@ -53,7 +53,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
goroutines: 0,
active: utils.Int64(0),
delayed: utils.Int64(0),
- stopCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -72,20 +72,16 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
}
func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
- jb := &consumer{
- log: log,
- pq: pq,
- eh: eh,
- goroutines: 0,
- active: utils.Int64(0),
- delayed: utils.Int64(0),
- stopCh: make(chan struct{}, 1),
- }
-
- // initialize a local queue
- jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000))
-
- return jb, nil
+ return &consumer{
+ log: log,
+ pq: pq,
+ eh: eh,
+ localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)),
+ goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
+ stopCh: make(chan struct{}),
+ }, nil
}
func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
@@ -123,6 +119,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -144,12 +141,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -169,8 +167,9 @@ func (c *consumer) Resume(_ context.Context, p string) {
c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
+ Driver: pipe.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
@@ -186,17 +185,28 @@ func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
}
func (c *consumer) Stop(_ context.Context) error {
+ start := time.Now()
pipe := c.pipeline.Load().(*pipeline.Pipeline)
- if atomic.LoadUint32(&c.listeners) > 0 {
- c.stopCh <- struct{}{}
+ select {
+ case c.stopCh <- struct{}{}:
+ default:
+ break
+ }
+
+ for i := 0; i < len(c.localPrefetch); i++ {
+ // drain all jobs from the channel
+ <-c.localPrefetch
}
+ c.localPrefetch = nil
+
c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
+ Driver: pipe.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
@@ -219,10 +229,12 @@ func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
time.Sleep(jj.Options.DelayDuration())
- // send the item after timeout expired
- c.localPrefetch <- jj
-
- atomic.AddUint64(&c.goroutines, ^uint64(0))
+ select {
+ case c.localPrefetch <- jj:
+ atomic.AddUint64(&c.goroutines, ^uint64(0))
+ default:
+ c.log.Warn("can't push job", "error", "local queue closed or full")
+ }
}(msg)
return nil
@@ -247,7 +259,7 @@ func (c *consumer) consume() {
select {
case item, ok := <-c.localPrefetch:
if !ok {
- c.log.Warn("ephemeral local prefetch queue was closed")
+ c.log.Warn("ephemeral local prefetch queue closed")
return
}
diff --git a/plugins/ephemeral/item.go b/plugins/memory/memoryjobs/item.go
index 3298424d..f4d62ada 100644
--- a/plugins/ephemeral/item.go
+++ b/plugins/memory/memoryjobs/item.go
@@ -1,4 +1,4 @@
-package ephemeral
+package memoryjobs
import (
"context"
@@ -124,6 +124,7 @@ func fromJob(job *job.Job) *Item {
Job: job.Job,
Ident: job.Ident,
Payload: job.Payload,
+ Headers: job.Headers,
Options: &Options{
Priority: job.Options.Priority,
Pipeline: job.Options.Pipeline,
diff --git a/plugins/memory/config.go b/plugins/memory/memorykv/config.go
index e51d09c5..a8a8993f 100644
--- a/plugins/memory/config.go
+++ b/plugins/memory/memorykv/config.go
@@ -1,4 +1,4 @@
-package memory
+package memorykv
// Config is default config for the in-memory driver
type Config struct {
diff --git a/plugins/memory/kv.go b/plugins/memory/memorykv/kv.go
index 68ea7266..9b3e176c 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/memorykv/kv.go
@@ -1,4 +1,4 @@
-package memory
+package memorykv
import (
"strings"
@@ -20,11 +20,11 @@ type Driver struct {
cfg *Config
}
-func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
+func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configurer) (*Driver, error) {
const op = errors.Op("new_in_memory_driver")
d := &Driver{
- stop: stop,
+ stop: make(chan struct{}),
log: log,
}
@@ -40,7 +40,7 @@ func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configure
return d, nil
}
-func (s *Driver) Has(keys ...string) (map[string]bool, error) {
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("in_memory_plugin_has")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -52,7 +52,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) {
return nil, errors.E(op, errors.EmptyKey)
}
- if _, ok := s.heap.Load(keys[i]); ok {
+ if _, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = true
}
}
@@ -60,7 +60,7 @@ func (s *Driver) Has(keys ...string) (map[string]bool, error) {
return m, nil
}
-func (s *Driver) Get(key string) ([]byte, error) {
+func (d *Driver) Get(key string) ([]byte, error) {
const op = errors.Op("in_memory_plugin_get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -68,7 +68,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
return nil, errors.E(op, errors.EmptyKey)
}
- if data, exist := s.heap.Load(key); exist {
+ if data, exist := d.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
return data.(*kvv1.Item).Value, nil
@@ -76,7 +76,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
return nil, nil
}
-func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("in_memory_plugin_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -93,7 +93,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
m := make(map[string][]byte, len(keys))
for i := range keys {
- if value, ok := s.heap.Load(keys[i]); ok {
+ if value, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = value.(*kvv1.Item).Value
}
}
@@ -101,7 +101,7 @@ func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
return m, nil
}
-func (s *Driver) Set(items ...*kvv1.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -120,14 +120,14 @@ func (s *Driver) Set(items ...*kvv1.Item) error {
}
}
- s.heap.Store(items[i].Key, items[i])
+ d.heap.Store(items[i].Key, items[i])
}
return nil
}
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...*kvv1.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
if items[i] == nil {
@@ -138,7 +138,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
}
// if key exist, overwrite it value
- if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok {
+ if pItem, ok := d.heap.LoadAndDelete(items[i].Key); ok {
// check that time is correct
_, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
@@ -148,7 +148,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, &kvv1.Item{
+ d.heap.Store(items[i].Key, &kvv1.Item{
Key: items[i].Key,
Value: tmp.Value,
Timeout: items[i].Timeout,
@@ -159,7 +159,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
return nil
}
-func (s *Driver) TTL(keys ...string) (map[string]string, error) {
+func (d *Driver) TTL(keys ...string) (map[string]string, error) {
const op = errors.Op("in_memory_plugin_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -176,14 +176,14 @@ func (s *Driver) TTL(keys ...string) (map[string]string, error) {
m := make(map[string]string, len(keys))
for i := range keys {
- if item, ok := s.heap.Load(keys[i]); ok {
+ if item, ok := d.heap.Load(keys[i]); ok {
m[keys[i]] = item.(*kvv1.Item).Timeout
}
}
return m, nil
}
-func (s *Driver) Delete(keys ...string) error {
+func (d *Driver) Delete(keys ...string) error {
const op = errors.Op("in_memory_plugin_delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
@@ -198,34 +198,38 @@ func (s *Driver) Delete(keys ...string) error {
}
for i := range keys {
- s.heap.Delete(keys[i])
+ d.heap.Delete(keys[i])
}
return nil
}
-func (s *Driver) Clear() error {
- s.clearMu.Lock()
- s.heap = sync.Map{}
- s.clearMu.Unlock()
+func (d *Driver) Clear() error {
+ d.clearMu.Lock()
+ d.heap = sync.Map{}
+ d.clearMu.Unlock()
return nil
}
+func (d *Driver) Stop() {
+ d.stop <- struct{}{}
+}
+
// ================================== PRIVATE ======================================
-func (s *Driver) gc() {
- ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second)
+func (d *Driver) gc() {
+ ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second)
+ defer ticker.Stop()
for {
select {
- case <-s.stop:
- ticker.Stop()
+ case <-d.stop:
return
case now := <-ticker.C:
// mutes needed to clear the map
- s.clearMu.RLock()
+ d.clearMu.RLock()
// check every second
- s.heap.Range(func(key, value interface{}) bool {
+ d.heap.Range(func(key, value interface{}) bool {
v := value.(*kvv1.Item)
if v.Timeout == "" {
return true
@@ -237,13 +241,13 @@ func (s *Driver) gc() {
}
if now.After(t) {
- s.log.Debug("key deleted", "key", key)
- s.heap.Delete(key)
+ d.log.Debug("key deleted", "key", key)
+ d.heap.Delete(key)
}
return true
})
- s.clearMu.RUnlock()
+ d.clearMu.RUnlock()
}
}
}
diff --git a/plugins/memory/pubsub.go b/plugins/memory/memorypubsub/pubsub.go
index fd30eb54..75122571 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/memorypubsub/pubsub.go
@@ -1,4 +1,4 @@
-package memory
+package memorypubsub
import (
"context"
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 7d418a70..515e469a 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,27 +2,29 @@ package memory
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/common/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memoryjobs"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memorykv"
+ "github.com/spiral/roadrunner/v2/plugins/memory/memorypubsub"
)
const PluginName string = "memory"
type Plugin struct {
- // heap is user map for the key-value pairs
- stop chan struct{}
-
- log logger.Logger
- cfgPlugin config.Configurer
- drivers uint
+ log logger.Logger
+ cfg config.Configurer
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
- p.cfgPlugin = cfg
- p.stop = make(chan struct{}, 1)
+ p.cfg = cfg
return nil
}
@@ -31,32 +33,36 @@ func (p *Plugin) Serve() chan error {
}
func (p *Plugin) Stop() error {
- if p.drivers > 0 {
- for i := uint(0); i < p.drivers; i++ {
- // send close signal to every driver
- p.stop <- struct{}{}
- }
- }
return nil
}
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
+
+// Drivers implementation
+
func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
- return NewPubSubDriver(p.log, key)
+ return memorypubsub.NewPubSubDriver(p.log, key)
}
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("inmemory_plugin_provide")
- st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop)
+ st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg)
if err != nil {
return nil, errors.E(op, err)
}
-
- // save driver number to release resources after Stop
- p.drivers++
-
return st, nil
}
-func (p *Plugin) Name() string {
- return PluginName
+// JobsConstruct creates new ephemeral consumer from the configuration
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return memoryjobs.NewJobBroker(configKey, p.log, p.cfg, e, pq)
+}
+
+// FromPipeline creates new ephemeral consumer from the provided pipeline
+func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return memoryjobs.FromPipeline(pipeline, p.log, e, pq)
}
diff --git a/plugins/redis/clients.go b/plugins/redis/clients.go
deleted file mode 100644
index d0a184d2..00000000
--- a/plugins/redis/clients.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package redis
-
-import (
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
-)
-
-// RedisClient return a client based on the provided section key
-// key sample: kv.some-section.redis
-// kv.redis
-// redis (root)
-func (p *Plugin) RedisClient(key string) (redis.UniversalClient, error) {
- const op = errors.Op("redis_get_client")
-
- if !p.cfgPlugin.Has(key) {
- return nil, errors.E(op, errors.Errorf("no such section: %s", key))
- }
-
- cfg := &Config{}
-
- err := p.cfgPlugin.UnmarshalKey(key, cfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- cfg.InitDefaults()
-
- uc := redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: cfg.Addrs,
- DB: cfg.DB,
- Username: cfg.Username,
- Password: cfg.Password,
- SentinelPassword: cfg.SentinelPassword,
- MaxRetries: cfg.MaxRetries,
- MinRetryBackoff: cfg.MaxRetryBackoff,
- MaxRetryBackoff: cfg.MaxRetryBackoff,
- DialTimeout: cfg.DialTimeout,
- ReadTimeout: cfg.ReadTimeout,
- WriteTimeout: cfg.WriteTimeout,
- PoolSize: cfg.PoolSize,
- MinIdleConns: cfg.MinIdleConns,
- MaxConnAge: cfg.MaxConnAge,
- PoolTimeout: cfg.PoolTimeout,
- IdleTimeout: cfg.IdleTimeout,
- IdleCheckFrequency: cfg.IdleCheckFreq,
- ReadOnly: cfg.ReadOnly,
- RouteByLatency: cfg.RouteByLatency,
- RouteRandomly: cfg.RouteRandomly,
- MasterName: cfg.MasterName,
- })
-
- return uc, nil
-}
-
-func (p *Plugin) DefaultClient() redis.UniversalClient {
- cfg := &Config{}
- cfg.InitDefaults()
-
- uc := redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: cfg.Addrs,
- DB: cfg.DB,
- Username: cfg.Username,
- Password: cfg.Password,
- SentinelPassword: cfg.SentinelPassword,
- MaxRetries: cfg.MaxRetries,
- MinRetryBackoff: cfg.MaxRetryBackoff,
- MaxRetryBackoff: cfg.MaxRetryBackoff,
- DialTimeout: cfg.DialTimeout,
- ReadTimeout: cfg.ReadTimeout,
- WriteTimeout: cfg.WriteTimeout,
- PoolSize: cfg.PoolSize,
- MinIdleConns: cfg.MinIdleConns,
- MaxConnAge: cfg.MaxConnAge,
- PoolTimeout: cfg.PoolTimeout,
- IdleTimeout: cfg.IdleTimeout,
- IdleCheckFrequency: cfg.IdleCheckFreq,
- ReadOnly: cfg.ReadOnly,
- RouteByLatency: cfg.RouteByLatency,
- RouteRandomly: cfg.RouteRandomly,
- MasterName: cfg.MasterName,
- })
-
- return uc
-}
diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go
deleted file mode 100644
index 189b0002..00000000
--- a/plugins/redis/interface.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package redis
-
-import "github.com/go-redis/redis/v8"
-
-// Redis in the redis KV plugin interface
-type Redis interface {
- // RedisClient provides universal redis client
- RedisClient(key string) (redis.UniversalClient, error)
-
- // DefaultClient provide default redis client based on redis defaults
- DefaultClient() redis.UniversalClient
-}
diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go
index b41cb86c..3d062fbb 100644
--- a/plugins/redis/kv/kv.go
+++ b/plugins/redis/kv/kv.go
@@ -248,3 +248,5 @@ func (d *Driver) Clear() error {
return nil
}
+
+func (d *Driver) Stop() {}
diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go
index dfbda154..92dbd6a8 100644
--- a/plugins/sqs/consumer.go
+++ b/plugins/sqs/consumer.go
@@ -298,6 +298,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ start := time.Now()
const op = errors.Op("sqs_run")
c.Lock()
@@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Stop(context.Context) error {
+ start := time.Now()
if atomic.LoadUint32(&c.listeners) > 0 {
c.pauseCh <- struct{}{}
}
@@ -333,12 +336,14 @@ func (c *consumer) Stop(context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -362,11 +367,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -391,7 +398,8 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
diff --git a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml
index a4f31290..71b51dce 100644
--- a/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml
+++ b/tests/plugins/jobs/beanstalk/.rr-beanstalk-jobs-err.yaml
@@ -7,9 +7,7 @@ server:
relay_timeout: "20s"
beanstalk:
- # beanstalk address
addr: tcp://127.0.0.1:11300
- # connect timeout
timeout: 10s
logs:
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index bf9f60cc..9813344e 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -45,17 +45,17 @@ jobs:
# list of broker pipelines associated with endpoints
pipelines:
test-local:
- driver: ephemeral
+ driver: memory
priority: 10
prefetch: 10000
test-local-2:
- driver: ephemeral
+ driver: memory
priority: 1
prefetch: 10000
test-local-3:
- driver: ephemeral
+ driver: memory
priority: 2
prefetch: 10000
diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go
index 951d6227..5c521c2b 100644
--- a/tests/plugins/jobs/jobs_general_test.go
+++ b/tests/plugins/jobs/jobs_general_test.go
@@ -14,9 +14,9 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/roadrunner/v2/plugins/amqp"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/metrics"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
@@ -68,7 +68,7 @@ func TestJobsInit(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
&amqp.Plugin{},
)
assert.NoError(t, err)
@@ -154,7 +154,7 @@ func TestJOBSMetrics(t *testing.T) {
&server.Plugin{},
&jobs.Plugin{},
&metrics.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
mockLogger,
)
assert.NoError(t, err)
@@ -204,8 +204,8 @@ func TestJOBSMetrics(t *testing.T) {
time.Sleep(time.Second * 2)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
+ t.Run("DeclareEphemeralPipeline", declareMemoryPipe)
+ t.Run("ConsumeEphemeralPipeline", consumeMemoryPipe)
t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5))
diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_memory_test.go
index 2890aa9d..20cbfb3f 100644
--- a/tests/plugins/jobs/jobs_ephemeral_test.go
+++ b/tests/plugins/jobs/jobs_memory_test.go
@@ -15,9 +15,9 @@ import (
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -26,12 +26,12 @@ import (
"github.com/stretchr/testify/assert"
)
-func TestEphemeralInit(t *testing.T) {
+func TestMemoryInit(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-init.yaml",
+ Path: "memory/.rr-memory-init.yaml",
Prefix: "rr",
}
@@ -58,7 +58,7 @@ func TestEphemeralInit(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -112,12 +112,12 @@ func TestEphemeralInit(t *testing.T) {
wg.Wait()
}
-func TestEphemeralDeclare(t *testing.T) {
+func TestMemoryDeclare(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-declare.yaml",
+ Path: "memory/.rr-memory-declare.yaml",
Prefix: "rr",
}
@@ -135,7 +135,7 @@ func TestEphemeralDeclare(t *testing.T) {
mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
err = cont.RegisterAll(
@@ -146,7 +146,7 @@ func TestEphemeralDeclare(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -197,25 +197,25 @@ func TestEphemeralDeclare(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("DeclarePipeline", declareMemoryPipe)
+ t.Run("ConsumePipeline", consumeMemoryPipe)
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
- t.Run("PauseEphemeralPipeline", pausePipelines("test-3"))
+ t.Run("PausePipeline", pausePipelines("test-3"))
time.Sleep(time.Second)
- t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3"))
+ t.Run("DestroyPipeline", destroyPipelines("test-3"))
time.Sleep(time.Second * 5)
stopCh <- struct{}{}
wg.Wait()
}
-func TestEphemeralPauseResume(t *testing.T) {
+func TestMemoryPauseResume(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-pause-resume.yaml",
+ Path: "memory/.rr-memory-pause-resume.yaml",
Prefix: "rr",
}
@@ -231,7 +231,7 @@ func TestEphemeralPauseResume(t *testing.T) {
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
@@ -249,7 +249,7 @@ func TestEphemeralPauseResume(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -301,10 +301,10 @@ func TestEphemeralPauseResume(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("ephemeralResume", resumePipes("test-local"))
- t.Run("ephemeralPause", pausePipelines("test-local"))
+ t.Run("Resume", resumePipes("test-local"))
+ t.Run("Pause", pausePipelines("test-local"))
t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local"))
- t.Run("ephemeralResume", resumePipes("test-local"))
+ t.Run("Resume", resumePipes("test-local"))
t.Run("pushToEnabledPipe", pushToPipe("test-local"))
time.Sleep(time.Second * 1)
@@ -313,12 +313,12 @@ func TestEphemeralPauseResume(t *testing.T) {
wg.Wait()
}
-func TestEphemeralJobsError(t *testing.T) {
+func TestMemoryJobsError(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-jobs-err.yaml",
+ Path: "memory/.rr-memory-jobs-err.yaml",
Prefix: "rr",
}
@@ -336,7 +336,7 @@ func TestEphemeralJobsError(t *testing.T) {
mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -348,7 +348,7 @@ func TestEphemeralJobsError(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -399,25 +399,25 @@ func TestEphemeralJobsError(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", resumePipes("test-3"))
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("DeclarePipeline", declareMemoryPipe)
+ t.Run("ConsumePipeline", resumePipes("test-3"))
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second * 25)
- t.Run("PauseEphemeralPipeline", pausePipelines("test-3"))
+ t.Run("PausePipeline", pausePipelines("test-3"))
time.Sleep(time.Second)
- t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3"))
+ t.Run("DestroyPipeline", destroyPipelines("test-3"))
time.Sleep(time.Second * 5)
stopCh <- struct{}{}
wg.Wait()
}
-func TestEphemeralStats(t *testing.T) {
+func TestMemoryStats(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "ephemeral/.rr-ephemeral-declare.yaml",
+ Path: "memory/.rr-memory-declare.yaml",
Prefix: "rr",
}
@@ -435,7 +435,7 @@ func TestEphemeralStats(t *testing.T) {
mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
- mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "memory", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
err = cont.RegisterAll(
@@ -446,7 +446,7 @@ func TestEphemeralStats(t *testing.T) {
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
- &ephemeral.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -497,22 +497,22 @@ func TestEphemeralStats(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("DeclarePipeline", declareMemoryPipe)
+ t.Run("ConsumePipeline", consumeMemoryPipe)
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
- t.Run("PauseEphemeralPipeline", pausePipelines("test-3"))
+ t.Run("PausePipeline", pausePipelines("test-3"))
time.Sleep(time.Second)
- t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5))
- t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ t.Run("PushPipeline", pushToPipeDelayed("test-3", 5))
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
out := &jobState.State{}
t.Run("Stats", stats(out))
assert.Equal(t, out.Pipeline, "test-3")
- assert.Equal(t, out.Driver, "ephemeral")
+ assert.Equal(t, out.Driver, "memory")
assert.Equal(t, out.Queue, "test-3")
assert.Equal(t, out.Active, int64(1))
@@ -520,14 +520,14 @@ func TestEphemeralStats(t *testing.T) {
assert.Equal(t, out.Reserved, int64(0))
time.Sleep(time.Second)
- t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
+ t.Run("ConsumePipeline", consumeMemoryPipe)
time.Sleep(time.Second * 7)
out = &jobState.State{}
t.Run("Stats", stats(out))
assert.Equal(t, out.Pipeline, "test-3")
- assert.Equal(t, out.Driver, "ephemeral")
+ assert.Equal(t, out.Driver, "memory")
assert.Equal(t, out.Queue, "test-3")
assert.Equal(t, out.Active, int64(0))
@@ -541,13 +541,13 @@ func TestEphemeralStats(t *testing.T) {
wg.Wait()
}
-func declareEphemeralPipe(t *testing.T) {
+func declareMemoryPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{
- "driver": "ephemeral",
+ "driver": "memory",
"name": "test-3",
"prefetch": "10000",
}}
@@ -557,7 +557,7 @@ func declareEphemeralPipe(t *testing.T) {
assert.NoError(t, err)
}
-func consumeEphemeralPipe(t *testing.T) {
+func consumeMemoryPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml b/tests/plugins/jobs/memory/.rr-memory-declare.yaml
index 726c24ac..726c24ac 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-declare.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-declare.yaml
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml b/tests/plugins/jobs/memory/.rr-memory-init.yaml
index 8914dfaa..9ee8afc2 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-init.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-init.yaml
@@ -22,12 +22,12 @@ jobs:
pipelines:
test-1:
- driver: ephemeral
+ driver: memory
priority: 10
prefetch: 10000
test-2:
- driver: ephemeral
+ driver: memory
priority: 10
prefetch: 10000
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml
index 05dc3ffa..05dc3ffa 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-jobs-err.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-jobs-err.yaml
diff --git a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml
index e1b76263..1ad48237 100644
--- a/tests/plugins/jobs/ephemeral/.rr-ephemeral-pause-resume.yaml
+++ b/tests/plugins/jobs/memory/.rr-memory-pause-resume.yaml
@@ -25,17 +25,17 @@ jobs:
# list of broker pipelines associated with endpoints
pipelines:
test-local:
- driver: ephemeral
+ driver: memory
priority: 10
pipeline_size: 10000
test-local-2:
- driver: ephemeral
+ driver: memory
priority: 1
pipeline_size: 10000
test-local-3:
- driver: ephemeral
+ driver: memory
priority: 2
pipeline_size: 10000
diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml
index f58de3e4..471e5c77 100644
--- a/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml
+++ b/tests/plugins/kv/configs/.rr-kv-bolt-no-interval.yaml
@@ -1,6 +1,9 @@
rpc:
listen: tcp://127.0.0.1:6001
+logs:
+ mode: development
+ level: error
kv:
boltdb-south:
diff --git a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml
index 08b3bfad..b46bcb1c 100644
--- a/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml
+++ b/tests/plugins/kv/configs/.rr-kv-bolt-perms.yaml
@@ -1,6 +1,9 @@
rpc:
listen: tcp://127.0.0.1:6001
+logs:
+ mode: development
+ level: error
kv:
boltdb-south:
diff --git a/tests/plugins/kv/configs/.rr-kv-init.yaml b/tests/plugins/kv/configs/.rr-kv-init.yaml
index a13b591c..6407c7ad 100644
--- a/tests/plugins/kv/configs/.rr-kv-init.yaml
+++ b/tests/plugins/kv/configs/.rr-kv-init.yaml
@@ -1,6 +1,9 @@
rpc:
listen: tcp://127.0.0.1:6001
+logs:
+ mode: development
+ level: error
kv:
default:
@@ -25,6 +28,3 @@ kv:
memcached:
driver: memcached
addr: [ "127.0.0.1:11211" ]
-
-# redis:
-# driver: redis
diff --git a/tests/plugins/redis/plugin1.go b/tests/plugins/redis/plugin1.go
deleted file mode 100644
index 68da1394..00000000
--- a/tests/plugins/redis/plugin1.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package redis
-
-import (
- "context"
- "time"
-
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
- redisPlugin "github.com/spiral/roadrunner/v2/plugins/redis"
-)
-
-type Plugin1 struct {
- redisClient redis.UniversalClient
-}
-
-func (p *Plugin1) Init(redis redisPlugin.Redis) error {
- var err error
- p.redisClient, err = redis.RedisClient("redis")
-
- return err
-}
-
-func (p *Plugin1) Serve() chan error {
- const op = errors.Op("plugin1 serve")
- errCh := make(chan error, 1)
- p.redisClient.Set(context.Background(), "foo", "bar", time.Minute)
-
- stringCmd := p.redisClient.Get(context.Background(), "foo")
- data, err := stringCmd.Result()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- if data != "bar" {
- errCh <- errors.E(op, errors.Str("no such key"))
- return errCh
- }
-
- return errCh
-}
-
-func (p *Plugin1) Stop() error {
- return nil
-}
diff --git a/tests/plugins/redis/redis_plugin_test.go b/tests/plugins/redis/redis_plugin_test.go
deleted file mode 100644
index 1b84e339..00000000
--- a/tests/plugins/redis/redis_plugin_test.go
+++ /dev/null
@@ -1,120 +0,0 @@
-package redis
-
-import (
- "fmt"
- "os"
- "os/signal"
- "sync"
- "syscall"
- "testing"
-
- "github.com/alicebob/miniredis/v2"
- "github.com/golang/mock/gomock"
- endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/redis"
- "github.com/spiral/roadrunner/v2/tests/mocks"
- "github.com/stretchr/testify/assert"
-)
-
-func redisConfig(port string) string {
- cfg := `
-redis:
- addrs:
- - '127.0.0.1:%s'
- master_name: ''
- username: ''
- password: ''
- db: 0
- sentinel_password: ''
- route_by_latency: false
- route_randomly: false
- dial_timeout: 0
- max_retries: 1
- min_retry_backoff: 0
- max_retry_backoff: 0
- pool_size: 0
- min_idle_conns: 0
- max_conn_age: 0
- read_timeout: 0
- write_timeout: 0
- pool_timeout: 0
- idle_timeout: 0
- idle_check_freq: 0
- read_only: false
-`
- return fmt.Sprintf(cfg, port)
-}
-
-func TestRedisInit(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
- if err != nil {
- t.Fatal(err)
- }
-
- s, err := miniredis.Run()
- assert.NoError(t, err)
-
- c := redisConfig(s.Port())
-
- cfg := &config.Viper{}
- cfg.Type = "yaml"
- cfg.ReadInCfg = []byte(c)
-
- controller := gomock.NewController(t)
- mockLogger := mocks.NewMockLogger(controller)
-
- err = cont.RegisterAll(
- cfg,
- mockLogger,
- &redis.Plugin{},
- &Plugin1{},
- )
- assert.NoError(t, err)
-
- err = cont.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ch, err := cont.Serve()
- assert.NoError(t, err)
-
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- stopCh := make(chan struct{}, 1)
-
- go func() {
- defer wg.Done()
- for {
- select {
- case e := <-ch:
- assert.Fail(t, "error", e.Error.Error())
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- case <-sig:
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- case <-stopCh:
- // timeout
- err = cont.Stop()
- if err != nil {
- assert.FailNow(t, "error", err.Error())
- }
- return
- }
- }
- }()
-
- stopCh <- struct{}{}
- wg.Wait()
-}
diff --git a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml
index 5ab359d3..d256aad7 100644
--- a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml
+++ b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml
@@ -1,3 +1,8 @@
logs:
mode: development
- level: error \ No newline at end of file
+ level: panic
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: panic
diff --git a/tests/plugins/rpc/configs/.rr.yaml b/tests/plugins/rpc/configs/.rr.yaml
index 67d935e3..d6aaa7c6 100644
--- a/tests/plugins/rpc/configs/.rr.yaml
+++ b/tests/plugins/rpc/configs/.rr.yaml
@@ -1,5 +1,11 @@
rpc:
listen: tcp://127.0.0.1:6001
+
logs:
mode: development
- level: error \ No newline at end of file
+ level: panic
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: panic