summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-18 17:26:42 +0300
committerGitHub <[email protected]>2021-08-18 17:26:42 +0300
commit324407b3e2d779143be65872993c4d091abb1d38 (patch)
treee6f0bd64241ab2d4dc05809128c8e8d7d74cbcc4
parenta5435be8ab58bd23f1c2d3afd4484dd1d86b6002 (diff)
parenteb70b89cb2f23ccd44b91bbcac7438a05a40c801 (diff)
#764: feat(stat): `job` plugin drivers statistic
#764: feat(stat): `job` plugin drivers statistic
-rw-r--r--.github/workflows/linters.yml2
-rw-r--r--.github/workflows/linux.yml2
-rwxr-xr-x.gitignore2
-rwxr-xr-xMakefile44
-rw-r--r--common/jobs/interface.go7
-rw-r--r--common/pubsub/interface.go4
-rw-r--r--go.mod56
-rw-r--r--go.sum8
-rw-r--r--pkg/bst/bst.go2
-rw-r--r--pkg/events/jobs_events.go8
-rwxr-xr-xpkg/pool/static_pool.go56
-rwxr-xr-xpkg/pool/static_pool_test.go22
-rwxr-xr-xpkg/pool/supervisor_pool.go2
-rw-r--r--pkg/pool/supervisor_test.go8
-rw-r--r--pkg/state/job/state.go19
-rw-r--r--pkg/state/process/state.go (renamed from pkg/process/state.go)0
-rw-r--r--pkg/worker_handler/errors.go1
-rw-r--r--pkg/worker_handler/errors_windows.go1
-rw-r--r--pkg/worker_watcher/container/channel/vec.go19
-rw-r--r--plugins/gzip/plugin.go1
-rw-r--r--plugins/http/plugin.go2
-rw-r--r--plugins/informer/interface.go19
-rw-r--r--plugins/informer/plugin.go36
-rw-r--r--plugins/informer/rpc.go11
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go194
-rw-r--r--plugins/jobs/drivers/amqp/item.go15
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go25
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go47
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go220
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go22
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go65
-rw-r--r--plugins/jobs/metrics.go91
-rw-r--r--plugins/jobs/plugin.go134
-rw-r--r--plugins/jobs/response_protocol.md2
-rw-r--r--plugins/jobs/rpc.go32
-rw-r--r--plugins/memory/pubsub.go34
-rw-r--r--plugins/redis/jobs/config.go34
-rw-r--r--plugins/redis/jobs/consumer.go1
-rw-r--r--plugins/redis/jobs/item.go1
-rw-r--r--plugins/redis/kv/config.go34
-rw-r--r--plugins/redis/kv/kv.go (renamed from plugins/redis/kv.go)2
-rw-r--r--plugins/redis/plugin.go6
-rw-r--r--plugins/redis/pubsub/channel.go (renamed from plugins/redis/channel.go)6
-rw-r--r--plugins/redis/pubsub/config.go34
-rw-r--r--plugins/redis/pubsub/pubsub.go (renamed from plugins/redis/pubsub.go)12
-rw-r--r--plugins/reload/plugin.go16
-rw-r--r--plugins/resetter/interface.go16
-rw-r--r--plugins/resetter/plugin.go43
-rw-r--r--plugins/resetter/rpc.go17
-rw-r--r--plugins/server/command.go2
-rw-r--r--plugins/server/plugin.go2
-rw-r--r--plugins/service/plugin.go2
-rw-r--r--plugins/websockets/plugin.go31
-rw-r--r--proto/jobs/v1beta/jobs.pb.go219
-rw-r--r--proto/jobs/v1beta/jobs.proto15
-rw-r--r--tests/env/Dockerfile-beanstalkd.yaml8
-rw-r--r--tests/env/docker-compose.yaml7
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go36
-rw-r--r--tests/plugins/broadcast/plugins/plugin1.go31
-rw-r--r--tests/plugins/broadcast/plugins/plugin2.go31
-rw-r--r--tests/plugins/broadcast/plugins/plugin3.go33
-rw-r--r--tests/plugins/broadcast/plugins/plugin4.go34
-rw-r--r--tests/plugins/broadcast/plugins/plugin5.go34
-rw-r--r--tests/plugins/broadcast/plugins/plugin6.go34
-rw-r--r--tests/plugins/http/http_plugin_test.go2
-rw-r--r--tests/plugins/informer/informer_test.go2
-rw-r--r--tests/plugins/informer/test_plugin.go2
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-metrics.yaml27
-rw-r--r--tests/plugins/jobs/helpers.go51
-rw-r--r--tests/plugins/jobs/jobs_amqp_test.go185
-rw-r--r--tests/plugins/jobs/jobs_beanstalk_test.go141
-rw-r--r--tests/plugins/jobs/jobs_ephemeral_test.go155
-rw-r--r--tests/plugins/jobs/jobs_general_test.go125
-rw-r--r--tests/plugins/jobs/jobs_sqs_test.go148
-rw-r--r--tests/plugins/jobs/jobs_with_toxics_test.go23
-rw-r--r--tests/plugins/metrics/docker-compose.yml7
-rw-r--r--tests/plugins/reload/configs/.rr-reload-2.yaml2
-rw-r--r--tests/plugins/reload/reload_plugin_test.go24
-rw-r--r--tests/plugins/resetter/resetter_test.go5
-rw-r--r--tests/plugins/service/service_plugin_test.go1
-rw-r--r--tests/supervised.php14
-rw-r--r--tests/temporal-worker.php (renamed from tests/worker.php)0
-rwxr-xr-xutils/isolate.go1
-rwxr-xr-xutils/isolate_win.go1
-rwxr-xr-xutils/network.go1
-rwxr-xr-xutils/network_windows.go1
86 files changed, 2271 insertions, 601 deletions
diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml
index b5e43b8e..99cca039 100644
--- a/.github/workflows/linters.yml
+++ b/.github/workflows/linters.yml
@@ -13,6 +13,6 @@ jobs:
- name: Run linter
uses: golangci/golangci-lint-action@v2 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
- version: v1.41 # without patch version
+ version: v1.42 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index fcf2c611..0224de69 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -23,7 +23,7 @@ jobs:
fail-fast: true
matrix:
php: [ "7.4", "8.0" ]
- go: [ "1.16" ]
+ go: [ "1.17" ]
os: [ 'ubuntu-latest' ]
steps:
- name: Set up Go ${{ matrix.go }}
diff --git a/.gitignore b/.gitignore
index beb3f885..86a94bda 100755
--- a/.gitignore
+++ b/.gitignore
@@ -12,7 +12,7 @@
unit_tests
unit_tests_copied
dir1
-coverage
+coverage-ci
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
diff --git a/Makefile b/Makefile
index 10f09f33..5cba50c5 100755
--- a/Makefile
+++ b/Makefile
@@ -15,9 +15,13 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.txt -covermode=atomic ./plugins/websockets
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/job
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer
@@ -34,10 +38,6 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/resetter.txt -covermode=atomic ./tests/plugins/resetter
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/rpc.txt -covermode=atomic ./tests/plugins/rpc
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/kv_plugin.txt -covermode=atomic ./tests/plugins/kv
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.txt -covermode=atomic ./plugins/websockets
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs
cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt
docker-compose -f tests/env/docker-compose.yaml down
@@ -74,37 +74,3 @@ test: ## Run application tests
go test -v -race -tags=debug ./plugins/websockets
go test -v -race -tags=debug ./tests/plugins/jobs
docker-compose -f tests/env/docker-compose.yaml down
-
-testGo1.17rc1: ## Run application tests
- docker-compose -f tests/env/docker-compose.yaml up -d
- go1.17rc1 test -v -race -tags=debug ./pkg/transport/pipe
- go1.17rc1 test -v -race -tags=debug ./pkg/transport/socket
- go1.17rc1 test -v -race -tags=debug ./pkg/pool
- go1.17rc1 test -v -race -tags=debug ./pkg/worker
- go1.17rc1 test -v -race -tags=debug ./pkg/worker_watcher
- go1.17rc1 test -v -race -tags=debug ./pkg/bst
- go1.17rc1 test -v -race -tags=debug ./pkg/priority_queue
- go1.17rc1 test -v -race -tags=debug ./plugins/jobs/job
- go1.17rc1 test -v -race -tags=debug ./plugins/jobs/pipeline
- go1.17rc1 test -v -race -tags=debug ./plugins/http/config
- go1.17rc1 test -v -race -tags=debug ./plugins/server
- go1.17rc1 test -v -race -tags=debug ./plugins/websockets
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/http
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/informer
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/reload
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/server
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/service
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/status
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/config
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/gzip
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/headers
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/logger
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/metrics
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/redis
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/resetter
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/rpc
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/kv
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/websockets
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/broadcast
- go1.17rc1 test -v -race -tags=debug ./tests/plugins/jobs
- docker-compose -f tests/env/docker-compose.yaml down
diff --git a/common/jobs/interface.go b/common/jobs/interface.go
index c957df2b..4b5ff70e 100644
--- a/common/jobs/interface.go
+++ b/common/jobs/interface.go
@@ -5,11 +5,12 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
)
-// Consumer todo naming
+// Consumer represents a single jobs driver interface
type Consumer interface {
Push(ctx context.Context, job *job.Job) error
Register(ctx context.Context, pipeline *pipeline.Pipeline) error
@@ -18,8 +19,12 @@ type Consumer interface {
Pause(ctx context.Context, pipeline string)
Resume(ctx context.Context, pipeline string)
+
+ // State provide information about driver state
+ State(ctx context.Context) (*jobState.State, error)
}
+// Constructor constructs Consumer interface. Endure abstraction.
type Constructor interface {
JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (Consumer, error)
FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (Consumer, error)
diff --git a/common/pubsub/interface.go b/common/pubsub/interface.go
index 06252d70..5b69d577 100644
--- a/common/pubsub/interface.go
+++ b/common/pubsub/interface.go
@@ -1,5 +1,7 @@
package pubsub
+import "context"
+
/*
This interface is in BETA. It might be changed.
*/
@@ -45,7 +47,7 @@ type Publisher interface {
// Reader interface should return next message
type Reader interface {
- Next() (*Message, error)
+ Next(ctx context.Context) (*Message, error)
}
// Constructor is a special pub-sub interface made to return a constructed PubSub type
diff --git a/go.mod b/go.mod
index 49731f8e..0daf6abe 100644
--- a/go.mod
+++ b/go.mod
@@ -1,10 +1,9 @@
module github.com/spiral/roadrunner/v2
-go 1.16
+go 1.17
require (
github.com/Shopify/toxiproxy v2.1.4+incompatible
- github.com/StackExchange/wmi v1.2.1 // indirect
github.com/alicebob/miniredis/v2 v2.15.1
// ========= AWS SDK v2
github.com/aws/aws-sdk-go-v2 v1.8.0
@@ -34,7 +33,6 @@ require (
github.com/spiral/goridge/v3 v3.2.0
// ===========
github.com/stretchr/testify v1.7.0
- github.com/tklauser/go-sysconf v0.3.7 // indirect
github.com/valyala/tcplisten v1.0.0
github.com/yookoala/gofast v0.6.0
go.etcd.io/bbolt v1.3.6
@@ -42,7 +40,57 @@ require (
go.uber.org/zap v1.19.0
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
- golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e
+ golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71
google.golang.org/protobuf v1.27.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
+
+require (
+ github.com/StackExchange/wmi v1.2.1 // indirect
+ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
+ github.com/andybalholm/brotli v1.0.2 // indirect
+ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.0 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/ini v1.2.0 // indirect
+ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.2 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sso v1.3.2 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sts v1.6.1 // indirect
+ github.com/beorn7/perks v1.0.1 // indirect
+ github.com/cespare/xxhash/v2 v2.1.1 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
+ github.com/fsnotify/fsnotify v1.4.9 // indirect
+ github.com/go-ole/go-ole v1.2.5 // indirect
+ github.com/golang/protobuf v1.5.2 // indirect
+ github.com/hashicorp/hcl v1.0.0 // indirect
+ github.com/magiconair/properties v1.8.5 // indirect
+ github.com/mattn/go-colorable v0.1.8 // indirect
+ github.com/mattn/go-isatty v0.0.12 // indirect
+ github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
+ github.com/mitchellh/mapstructure v1.4.1 // indirect
+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+ github.com/modern-go/reflect2 v1.0.1 // indirect
+ github.com/pelletier/go-toml v1.9.3 // indirect
+ github.com/pmezard/go-difflib v1.0.0 // indirect
+ github.com/prometheus/client_model v0.2.0 // indirect
+ github.com/prometheus/common v0.26.0 // indirect
+ github.com/prometheus/procfs v0.6.0 // indirect
+ github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c // indirect
+ github.com/spf13/afero v1.6.0 // indirect
+ github.com/spf13/cast v1.3.1 // indirect
+ github.com/spf13/jwalterweatherman v1.1.0 // indirect
+ github.com/spf13/pflag v1.0.5 // indirect
+ github.com/subosito/gotenv v1.2.0 // indirect
+ github.com/tklauser/go-sysconf v0.3.8 // indirect
+ github.com/tklauser/numcpus v0.2.3 // indirect
+ github.com/valyala/bytebufferpool v1.0.0 // indirect
+ github.com/valyala/fasthttp v1.26.0 // indirect
+ github.com/vmihailenco/msgpack v4.0.4+incompatible // indirect
+ github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
+ go.uber.org/atomic v1.7.0 // indirect
+ golang.org/x/text v0.3.6 // indirect
+ golang.org/x/tools v0.1.2 // indirect
+ google.golang.org/appengine v1.6.7 // indirect
+ gopkg.in/ini.v1 v1.62.0 // indirect
+ gopkg.in/yaml.v2 v2.4.0 // indirect
+ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
+)
diff --git a/go.sum b/go.sum
index 560bd9c7..bcd4166f 100644
--- a/go.sum
+++ b/go.sum
@@ -388,8 +388,8 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
-github.com/tklauser/go-sysconf v0.3.7 h1:HT7h4+536gjqeq1ZIJPgOl1rg1XFatQGVZWp7Py53eg=
-github.com/tklauser/go-sysconf v0.3.7/go.mod h1:JZIdXh4RmBvZDBZ41ld2bGxRV3n4daiiqA3skYhAoQ4=
+github.com/tklauser/go-sysconf v0.3.8 h1:41Nq9J+pxKud4IQ830J5LlS5nl67dVQC7AuisUooaOU=
+github.com/tklauser/go-sysconf v0.3.8/go.mod h1:z4zYWRS+X53WUKtBcmDg1comV3fPhdQnzasnIHUoLDU=
github.com/tklauser/numcpus v0.2.3 h1:nQ0QYpiritP6ViFhrKYsiv6VVxOpum2Gks5GhnJbS/8=
github.com/tklauser/numcpus v0.2.3/go.mod h1:vpEPS/JC+oZGGQ/My/vJnNsvMDQL6PwOqt8dsCw5j+E=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
@@ -607,8 +607,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA=
-golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71 h1:ikCpsnYR+Ew0vu99XlDp55lGgDJdIMx3f4a18jfse/s=
+golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go
index f8426b12..dab9346c 100644
--- a/pkg/bst/bst.go
+++ b/pkg/bst/bst.go
@@ -88,7 +88,7 @@ func (b *BST) Remove(uuid string, topic string) {
b.removeHelper(uuid, topic, nil)
}
-func (b *BST) removeHelper(uuid string, topic string, parent *BST) { //nolint:gocognit
+func (b *BST) removeHelper(uuid string, topic string, parent *BST) {
curr := b
for curr != nil {
if topic < curr.topic { //nolint:gocritic
diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go
index 300f6748..f65ede67 100644
--- a/pkg/events/jobs_events.go
+++ b/pkg/events/jobs_events.go
@@ -6,7 +6,7 @@ import (
const (
// EventPushOK thrown when new job has been added. JobEvent is passed as context.
- EventPushOK = iota + 12000
+ EventPushOK J = iota + 12000
// EventPushError caused when job can not be registered.
EventPushError
@@ -58,6 +58,8 @@ func (ev J) String() string {
return "EventPipeError"
case EventDriverReady:
return "EventDriverReady"
+ case EventPipePaused:
+ return "EventPipePaused"
}
return UnknownEventType
}
@@ -67,16 +69,12 @@ type JobEvent struct {
Event J
// String is job id.
ID string
-
// Pipeline name
Pipeline string
-
// Associated driver name (amqp, ephemeral, etc)
Driver string
-
// Error for the jobs/pipes errors
Error error
-
// event timings
Start time.Time
Elapsed time.Duration
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 051e7a8a..3eb0714f 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -238,7 +238,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
return w, nil
}
-// Destroy all underlying stack (but let them to complete the task).
+// Destroy all underlying stack (but let them complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
sp.ww.Destroy(ctx)
}
@@ -250,34 +250,48 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
switch {
case errors.Is(errors.ExecTTL, err):
sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
+ w.State().Set(worker.StateInvalid)
+ return nil, err
case errors.Is(errors.SoftJob, err):
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- // TODO suspicious logic, redesign
- err = sp.ww.Allocate()
- if err != nil {
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
- }
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ // if max jobs exceed
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // mark old as invalid and stop
w.State().Set(worker.StateInvalid)
- err = w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
- } else {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
- sp.ww.Release(w)
+
+ return nil, err
}
- }
- w.State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- errS := w.Stop()
- if errS != nil {
- return nil, errors.E(op, err, errS)
- }
+ // soft jobs errors are allowed, just put the worker back
+ sp.ww.Release(w)
- return nil, errors.E(op, err)
+ return nil, err
+ case errors.Is(errors.Network, err):
+ // in case of network error, we can't stop the worker, we should kill it
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+
+ // kill the worker instead of sending net packet to it
+ _ = w.Kill()
+
+ return nil, err
+ default:
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ // stop the worker, worker here might be in the broken state (network)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, errors.E(op, err)
+ }
}
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index b72b8c32..cb6578a8 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -227,9 +227,9 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
AddListeners(listener),
)
assert.NoError(t, err)
- defer p.Destroy(ctx)
-
assert.NotNil(t, p)
+ defer p.Destroy(ctx)
+ time.Sleep(time.Second)
res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
@@ -290,10 +290,12 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
},
)
assert.NoError(t, err)
- defer p.Destroy(ctx)
-
assert.NotNil(t, p)
+ defer p.Destroy(ctx)
+ // prevent process is not ready
+ time.Sleep(time.Second)
+
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
@@ -326,10 +328,12 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
},
)
assert.NoError(t, err)
- defer p.Destroy(ctx)
-
assert.NotNil(t, p)
+ defer p.Destroy(ctx)
+
+ // prevent process is not ready
+ time.Sleep(time.Second)
assert.Len(t, p.Workers(), 0)
var lastPID string
@@ -366,10 +370,11 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
},
)
assert.NoError(t, err)
- defer p.Destroy(ctx)
-
assert.NotNil(t, p)
+ defer p.Destroy(ctx)
+ time.Sleep(time.Second)
+
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
@@ -460,6 +465,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
+ time.Sleep(time.Second)
for i := range p.Workers() {
p.Workers()[i].State().Set(worker.StateErrored)
}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index bdaeade1..e6b2bd7c 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -8,7 +8,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 0702a71f..d1b24574 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -39,6 +39,8 @@ func TestSupervisedPool_Exec(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
+ time.Sleep(time.Second)
+
pidBefore := p.Workers()[0].Pid()
for i := 0; i < 100; i++ {
@@ -63,7 +65,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
ctx := context.Background()
p, err := Initialize(
ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "../../tests/supervised.php") },
pipe.NewPipeFactory(),
cfgSupervised,
)
@@ -71,8 +73,10 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
+ time.Sleep(time.Second)
+
for i := 0; i < 100; i++ {
- time.Sleep(time.Millisecond * 100)
+ time.Sleep(time.Millisecond * 500)
_, err = p.Exec(&payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
diff --git a/pkg/state/job/state.go b/pkg/state/job/state.go
new file mode 100644
index 00000000..56050084
--- /dev/null
+++ b/pkg/state/job/state.go
@@ -0,0 +1,19 @@
+package job
+
+// State represents job's state
+type State struct {
+ // Pipeline name
+ Pipeline string
+ // Driver name
+ Driver string
+ // Queue name (tube for the beanstalk)
+ Queue string
+ // Active jobs which are consumed from the driver but not handled by the PHP worker yet
+ Active int64
+ // Delayed jobs
+ Delayed int64
+ // Reserved jobs which are in the driver but not consumed yet
+ Reserved int64
+ // Status - 1 Ready, 0 - Paused
+ Ready bool
+}
diff --git a/pkg/process/state.go b/pkg/state/process/state.go
index bfc3a287..bfc3a287 100644
--- a/pkg/process/state.go
+++ b/pkg/state/process/state.go
diff --git a/pkg/worker_handler/errors.go b/pkg/worker_handler/errors.go
index 5fa8e64e..c3352a52 100644
--- a/pkg/worker_handler/errors.go
+++ b/pkg/worker_handler/errors.go
@@ -1,3 +1,4 @@
+//go:build !windows
// +build !windows
package handler
diff --git a/pkg/worker_handler/errors_windows.go b/pkg/worker_handler/errors_windows.go
index 390cc7d1..3c6c2186 100644
--- a/pkg/worker_handler/errors_windows.go
+++ b/pkg/worker_handler/errors_windows.go
@@ -1,3 +1,4 @@
+//go:build windows
// +build windows
package handler
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
index 51093978..7fb65a92 100644
--- a/pkg/worker_watcher/container/channel/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -39,6 +39,7 @@ func (v *Vec) Push(w worker.BaseProcess) {
// because in that case, workers in the v.workers channel can be TTL-ed and killed
// but presenting in the channel
default:
+ // Stop Pop operations
v.Lock()
defer v.Unlock()
@@ -48,19 +49,29 @@ func (v *Vec) Push(w worker.BaseProcess) {
2. Violated Get <-> Release operation (how ??)
*/
for i := uint64(0); i < v.len; i++ {
+ /*
+ We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
+ */
wrk := <-v.workers
switch wrk.State().Value() {
- // skip good states
+ // skip good states, put worker back
case worker.StateWorking, worker.StateReady:
// put the worker back
// generally, while send and receive operations are concurrent (from the channel), channel behave
// like a FIFO, but when re-sending from the same goroutine it behaves like a FILO
v.workers <- wrk
continue
+ /*
+ Bad states are here.
+ */
default:
// kill the current worker (just to be sure it's dead)
- _ = wrk.Kill()
- // replace with the new one
+ if wrk != nil {
+ _ = wrk.Kill()
+ }
+ // replace with the new one and return from the loop
+ // new worker can be ttl-ed at this moment, it's possible to replace TTL-ed worker with new TTL-ed worker
+ // But this case will be handled in the worker_watcher::Get
v.workers <- w
return
}
@@ -78,7 +89,7 @@ func (v *Vec) Pop(ctx context.Context) (worker.BaseProcess, error) {
}
*/
- if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ if atomic.LoadUint64(&v.destroy) == 1 {
return nil, errors.E(errors.WatcherStopped)
}
diff --git a/plugins/gzip/plugin.go b/plugins/gzip/plugin.go
index a957878c..05f1eb63 100644
--- a/plugins/gzip/plugin.go
+++ b/plugins/gzip/plugin.go
@@ -10,7 +10,6 @@ const PluginName = "gzip"
type Plugin struct{}
-// Init needed for the Endure
func (g *Plugin) Init() error {
return nil
}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 2ee83384..dc887f87 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -10,7 +10,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
"github.com/spiral/roadrunner/v2/plugins/config"
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index bbc1a048..9277b85b 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -1,7 +1,10 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
)
/*
@@ -9,17 +12,23 @@ Informer plugin should not receive any other plugin in the Init or via Collects
Because Availabler implementation should present in every plugin
*/
+// Statistic interfaces ==============
+
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
Workers() []*process.State
}
+// JobsStat interface provide statistic for the jobs plugin
+type JobsStat interface {
+ // JobsState returns slice with the attached drivers information
+ JobsState(ctx context.Context) ([]*job.State, error)
+}
+
+// Statistic interfaces end ============
+
// Availabler interface should be implemented by every plugin which wish to report to the PHP worker that it available in the RR runtime
type Availabler interface {
// Available method needed to collect all plugins which are available in the runtime.
Available()
}
-
-type JobsStat interface {
- Stat()
-}
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index c613af58..87180be5 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -1,20 +1,30 @@
package informer
import (
+ "context"
+
endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
)
const PluginName = "informer"
type Plugin struct {
+ log logger.Logger
+
+ withJobs map[string]JobsStat
withWorkers map[string]Informer
available map[string]Availabler
}
-func (p *Plugin) Init() error {
+func (p *Plugin) Init(log logger.Logger) error {
p.available = make(map[string]Availabler)
p.withWorkers = make(map[string]Informer)
+ p.withJobs = make(map[string]JobsStat)
+
+ p.log = log
return nil
}
@@ -28,11 +38,29 @@ func (p *Plugin) Workers(name string) []*process.State {
return svc.Workers()
}
+// Jobs provides information about jobs for the registered plugin using jobs
+func (p *Plugin) Jobs(name string) []*job.State {
+ svc, ok := p.withJobs[name]
+ if !ok {
+ return nil
+ }
+
+ st, err := svc.JobsState(context.Background())
+ if err != nil {
+ p.log.Info("jobs stat", "error", err)
+ // skip errors here
+ return nil
+ }
+
+ return st
+}
+
// Collects declares services to be collected.
func (p *Plugin) Collects() []interface{} {
return []interface{}{
p.CollectPlugins,
p.CollectWorkers,
+ p.CollectJobs,
}
}
@@ -46,6 +74,10 @@ func (p *Plugin) CollectWorkers(name endure.Named, r Informer) {
p.withWorkers[name.Name()] = r
}
+func (p *Plugin) CollectJobs(name endure.Named, j JobsStat) {
+ p.withJobs[name.Name()] = j
+}
+
// Name of the service.
func (p *Plugin) Name() string {
return PluginName
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 02254865..478d3227 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -1,7 +1,8 @@
package informer
import (
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
)
type rpc struct {
@@ -10,7 +11,7 @@ type rpc struct {
// WorkerList contains list of workers.
type WorkerList struct {
- // Workers is list of workers.
+ // Workers are list of workers.
Workers []*process.State `json:"workers"`
}
@@ -29,7 +30,6 @@ func (rpc *rpc) List(_ bool, list *[]string) error {
func (rpc *rpc) Workers(service string, list *WorkerList) error {
workers := rpc.srv.Workers(service)
if workers == nil {
- list = nil
return nil
}
@@ -39,6 +39,11 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error {
return nil
}
+func (rpc *rpc) Jobs(service string, out *[]*job.State) error {
+ *out = rpc.srv.Jobs(service)
+ return nil
+}
+
// sort.Sort
func (w *WorkerList) Len() int {
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 429953e1..95df02ec 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -12,10 +12,12 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type JobConsumer struct {
@@ -50,9 +52,8 @@ type JobConsumer struct {
multipleAck bool
requeueOnFail bool
- delayCache map[string]struct{}
-
listeners uint32
+ delayed *int64
stopCh chan struct{}
}
@@ -99,8 +100,8 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
stopCh: make(chan struct{}),
// TODO to config
retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
priority: pipeCfg.Priority,
+ delayed: utils.Int64(0),
publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeCfg.RoutingKey,
@@ -169,7 +170,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
consumeID: uuid.NewString(),
stopCh: make(chan struct{}),
retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
+ delayed: utils.Int64(0),
publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeline.String(routingKey, ""),
@@ -231,81 +232,6 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return nil
}
-// handleItem
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("rabbitmq_handle_item")
- select {
- case pch := <-j.publishChan:
- // return the channel back
- defer func() {
- j.publishChan <- pch
- }()
-
- // convert
- table, err := pack(msg.ID(), msg)
- if err != nil {
- return errors.E(op, err)
- }
-
- const op = errors.Op("amqp_handle_item")
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- // TODO dlx cache channel??
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
- _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
- if err != nil {
- return errors.E(op, err)
- }
-
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now().UTC(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- j.delayCache[tmpQ] = struct{}{}
-
- return nil
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
-
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
j.pipeline.Store(p)
return nil
@@ -361,6 +287,35 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("amqp_driver_state")
+ select {
+ case pch := <-j.publishChan:
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ q, err := pch.QueueInspect(j.queue)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: q.Name,
+ Active: int64(q.Messages),
+ Delayed: atomic.LoadInt64(j.delayed),
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
+ }, nil
+
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+
func (j *JobConsumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -470,3 +425,84 @@ func (j *JobConsumer) Stop(context.Context) error {
})
return nil
}
+
+// handleItem
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("rabbitmq_handle_item")
+ select {
+ case pch := <-j.publishChan:
+ // return the channel back
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ // convert
+ table, err := pack(msg.ID(), msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ const op = errors.Op("rabbitmq_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ atomic.AddInt64(j.delayed, 1)
+ // TODO declare separate method for this if condition
+ // TODO dlx cache channel??
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now().UTC(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ atomic.AddInt64(j.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 5990d137..623dcca7 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -3,6 +3,7 @@ package amqp
import (
"context"
"fmt"
+ "sync/atomic"
"time"
json "github.com/json-iterator/go"
@@ -52,8 +53,8 @@ type Options struct {
nack func(multiply bool, requeue bool) error
// requeueFn used as a pointer to the push function
- requeueFn func(context.Context, *Item) error
-
+ requeueFn func(context.Context, *Item) error
+ delayed *int64
multipleAsk bool
requeue bool
}
@@ -96,15 +97,24 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
return i.Options.ack(i.Options.multipleAsk)
}
func (i *Item) Nack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
return i.Options.nack(false, i.Options.requeue)
}
// Requeue with the provided delay, handled by the Nack
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
@@ -146,6 +156,7 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
+ item.Options.delayed = j.delayed
// requeue func
item.Options.requeueFn = j.handleItem
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index 32ca4188..d3241b37 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -67,7 +67,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return 0, errN
+ return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry put only when we redialed
return cp.t.Put(body, pri, delay, ttr)
@@ -92,7 +92,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return 0, nil, errN
+ return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry Reserve only when we redialed
return cp.ts.Reserve(reserveTimeout)
@@ -102,7 +102,7 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
return id, body, nil
}
-func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
+func (cp *ConnPool) Delete(_ context.Context, id uint64) error {
cp.RLock()
defer cp.RUnlock()
@@ -111,7 +111,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
// errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
- return errN
+ return errors.Errorf("err: %s\nerr redial: %s", err, errN)
} else {
// retry Delete only when we redialed
return cp.conn.Delete(id)
@@ -120,6 +120,23 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
return nil
}
+func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+
+ stat, err := cp.conn.Stats()
+ if err != nil {
+ errR := cp.checkAndRedial(err)
+ if errR != nil {
+ return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR)
+ } else {
+ return cp.conn.Stats()
+ }
+ }
+
+ return stat, nil
+}
+
func (cp *ConnPool) redial() error {
const op = errors.Op("connection_pool_redial")
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index eaf99be1..6323148b 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -3,6 +3,7 @@ package beanstalk
import (
"bytes"
"context"
+ "strconv"
"strings"
"sync/atomic"
"time"
@@ -10,6 +11,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -213,12 +215,49 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
return nil
}
-func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error {
+func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
// register the pipeline
j.pipeline.Store(p)
return nil
}
+// State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("beanstalk_state")
+ stat, err := j.pool.Stats(ctx)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: j.tName,
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
+ }
+
+ // set stat, skip errors (replace with 0)
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L523
+ if v, err := strconv.Atoi(stat["current-jobs-ready"]); err == nil {
+ out.Active = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L525
+ if v, err := strconv.Atoi(stat["current-jobs-reserved"]); err == nil {
+ // this is not an error, reserved in beanstalk behaves like an active jobs
+ out.Reserved = int64(v)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L528
+ if v, err := strconv.Atoi(stat["current-jobs-delayed"]); err == nil {
+ out.Delayed = int64(v)
+ }
+
+ return out, nil
+}
+
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
@@ -260,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, p string) {
+func (j *JobConsumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -315,3 +354,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
Start: time.Now(),
})
}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 95ad6ecd..f0992cd6 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -2,17 +2,18 @@ package ephemeral
import (
"context"
- "sync"
"sync/atomic"
"time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -28,14 +29,18 @@ type JobConsumer struct {
cfg *Config
log logger.Logger
eh events.Handler
- pipeline sync.Map
+ pipeline atomic.Value
pq priorityqueue.Queue
localPrefetch chan *Item
// time.sleep goroutines max number
goroutines uint64
- stopCh chan struct{}
+ delayed *int64
+ active *int64
+
+ listeners uint32
+ stopCh chan struct{}
}
func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -46,6 +51,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
pq: pq,
eh: eh,
goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
stopCh: make(chan struct{}, 1),
}
@@ -61,9 +68,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
// initialize a local queue
jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch)
- // consume from the queue
- go jb.consume()
-
return jb, nil
}
@@ -73,15 +77,14 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
pq: pq,
eh: eh,
goroutines: 0,
+ active: utils.Int64(0),
+ delayed: utils.Int64(0),
stopCh: make(chan struct{}, 1),
}
// initialize a local queue
jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000))
- // consume from the queue
- go jb.consume()
-
return jb, nil
}
@@ -89,15 +92,11 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- b, ok := j.pipeline.Load(jb.Options.Pipeline)
+ _, ok := j.pipeline.Load().(*pipeline.Pipeline)
if !ok {
return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
}
- if !b.(bool) {
- return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
- }
-
err := j.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
@@ -106,102 +105,70 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("ephemeral_handle_request")
- // handle timeouts
- // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
- // goroutines here. We should limit goroutines here.
- if msg.Options.Delay > 0 {
- // if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
- return errors.E(op, errors.Str("max concurrency number reached"))
- }
-
- go func(jj *Item) {
- atomic.AddUint64(&j.goroutines, 1)
- time.Sleep(jj.Options.DelayDuration())
-
- // send the item after timeout expired
- j.localPrefetch <- jj
-
- atomic.AddUint64(&j.goroutines, ^uint64(0))
- }(msg)
-
- return nil
- }
-
- // insert to the local, limited pipeline
- select {
- case j.localPrefetch <- msg:
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
- }
-}
-
-func (j *JobConsumer) consume() {
- // redirect
- for {
- select {
- case item, ok := <-j.localPrefetch:
- if !ok {
- j.log.Warn("ephemeral local prefetch queue was closed")
- return
- }
-
- // set requeue channel
- item.Options.requeueFn = j.handleItem
-
- j.pq.Insert(item)
- case <-j.stopCh:
- return
- }
- }
+func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: pipe.Name(),
+ Active: atomic.LoadInt64(j.active),
+ Delayed: atomic.LoadInt64(j.delayed),
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
+ }, nil
}
func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
- const op = errors.Op("ephemeral_register")
- if _, ok := j.pipeline.Load(pipeline.Name()); ok {
- return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
- }
-
- j.pipeline.Store(pipeline.Name(), true)
-
+ j.pipeline.Store(pipeline)
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, pipeline string) {
- if q, ok := j.pipeline.Load(pipeline); ok {
- if q == true {
- // mark pipeline as turned off
- j.pipeline.Store(pipeline, false)
- }
- // if not true - do not send the EventPipeStopped, because pipe already stopped
+func (j *JobConsumer) Pause(_ context.Context, p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
return
}
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ // stop the consumer
+ j.stopCh <- struct{}{}
+
j.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
- Pipeline: pipeline,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
}
-func (j *JobConsumer) Resume(_ context.Context, pipeline string) {
- if q, ok := j.pipeline.Load(pipeline); ok {
- if q == false {
- // mark pipeline as turned on
- j.pipeline.Store(pipeline, true)
- }
+func (j *JobConsumer) Resume(_ context.Context, p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested resume on: ", p)
+ }
- // if not true - do not send the EventPipeActive, because pipe already active
+ l := atomic.LoadUint32(&j.listeners)
+ // listener already active
+ if l == 1 {
+ j.log.Warn("listener already in the active state")
return
}
+ // resume the consumer on the same channel
+ j.consume()
+
+ atomic.StoreUint32(&j.listeners, 1)
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
- Pipeline: pipeline,
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
@@ -220,25 +187,88 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
func (j *JobConsumer) Stop(ctx context.Context) error {
const op = errors.Op("ephemeral_plugin_stop")
- var pipe string
- j.pipeline.Range(func(key, _ interface{}) bool {
- pipe = key.(string)
- j.pipeline.Delete(key)
- return true
- })
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
select {
// return from the consumer
case j.stopCh <- struct{}{}:
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
- Pipeline: pipe,
+ Pipeline: pipe.Name(),
Start: time.Now(),
Elapsed: 0,
})
+
return nil
case <-ctx.Done():
return errors.E(op, ctx.Err())
}
}
+
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("ephemeral_handle_request")
+ // handle timeouts
+ // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
+ // goroutines here. We should limit goroutines here.
+ if msg.Options.Delay > 0 {
+ // if we have 1000 goroutines waiting on the delay - reject 1001
+ if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
+ return errors.E(op, errors.Str("max concurrency number reached"))
+ }
+
+ go func(jj *Item) {
+ atomic.AddUint64(&j.goroutines, 1)
+ atomic.AddInt64(j.delayed, 1)
+
+ time.Sleep(jj.Options.DelayDuration())
+
+ // send the item after timeout expired
+ j.localPrefetch <- jj
+
+ atomic.AddUint64(&j.goroutines, ^uint64(0))
+ }(msg)
+
+ return nil
+ }
+
+ // increase number of the active jobs
+ atomic.AddInt64(j.active, 1)
+
+ // insert to the local, limited pipeline
+ select {
+ case j.localPrefetch <- msg:
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ }
+}
+
+func (j *JobConsumer) consume() {
+ go func() {
+ // redirect
+ for {
+ select {
+ case item, ok := <-j.localPrefetch:
+ if !ok {
+ j.log.Warn("ephemeral local prefetch queue was closed")
+ return
+ }
+
+ // set requeue channel
+ item.Options.requeueFn = j.handleItem
+ item.Options.active = j.active
+ item.Options.delayed = j.delayed
+
+ j.pq.Insert(item)
+ case <-j.stopCh:
+ return
+ }
+ }
+ }()
+}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index 1a61d7e9..3298424d 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -2,6 +2,7 @@ package ephemeral
import (
"context"
+ "sync/atomic"
"time"
json "github.com/json-iterator/go"
@@ -40,6 +41,8 @@ type Options struct {
// private
requeueFn func(context.Context, *Item) error
+ active *int64
+ delayed *int64
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -79,12 +82,12 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
- // noop for the in-memory
+ i.atomicallyReduceCount()
return nil
}
func (i *Item) Nack() error {
- // noop for the in-memory
+ i.atomicallyReduceCount()
return nil
}
@@ -93,6 +96,8 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
i.Options.Delay = delay
i.Headers = headers
+ i.atomicallyReduceCount()
+
err := i.Options.requeueFn(context.Background(), i)
if err != nil {
return err
@@ -101,6 +106,19 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
return nil
}
+// atomicallyReduceCount reduces counter of active or delayed jobs
+func (i *Item) atomicallyReduceCount() {
+ // if job was delayed, reduce number of the delayed jobs
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ return
+ }
+
+ // otherwise, reduce number of the active jobs
+ atomic.AddInt64(i.Options.active, ^int64(0))
+ // noop for the in-memory
+}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 9ce37543..5d419b51 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -2,6 +2,7 @@ package sqs
import (
"context"
+ "strconv"
"sync"
"sync/atomic"
"time"
@@ -11,10 +12,12 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -261,17 +264,46 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
+func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("sqs_state")
+ attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: j.queueURL,
+ AttributeNames: []types.QueueAttributeName{
+ types.QueueAttributeNameApproximateNumberOfMessages,
+ types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
+ types.QueueAttributeNameApproximateNumberOfMessagesNotVisible,
+ },
+ })
+
if err != nil {
- return err
+ return nil, errors.E(op, err)
}
- _, err = j.client.SendMessage(ctx, d)
- if err != nil {
- return err
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ out := &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: *j.queueURL,
+ Ready: ready(atomic.LoadUint32(&j.listeners)),
}
- return nil
+ nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
+ if err == nil {
+ out.Active = int64(nom)
+ }
+
+ delayed, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesDelayed)])
+ if err == nil {
+ out.Delayed = int64(delayed)
+ }
+
+ nv, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessagesNotVisible)])
+ if err == nil {
+ out.Reserved = int64(nv)
+ }
+
+ return out, nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -280,7 +312,7 @@ func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("rabbit_consume")
+ const op = errors.Op("sqs_run")
j.Lock()
defer j.Unlock()
@@ -374,3 +406,20 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
Start: time.Now(),
})
}
+
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ d, err := msg.pack(j.queueURL)
+ if err != nil {
+ return err
+ }
+ _, err = j.client.SendMessage(ctx, d)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/jobs/metrics.go b/plugins/jobs/metrics.go
new file mode 100644
index 00000000..61856a10
--- /dev/null
+++ b/plugins/jobs/metrics.go
@@ -0,0 +1,91 @@
+package jobs
+
+import (
+ "sync/atomic"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/plugins/informer"
+)
+
+func (p *Plugin) MetricsCollector() []prometheus.Collector {
+ // p - implements Exporter interface (workers)
+ // other - request duration and count
+ return []prometheus.Collector{p.statsExporter}
+}
+
+const (
+ namespace = "rr_jobs"
+)
+
+type statsExporter struct {
+ workers informer.Informer
+ workersMemory uint64
+ jobsOk uint64
+ pushOk uint64
+ jobsErr uint64
+ pushErr uint64
+}
+
+var (
+ worker = prometheus.NewDesc("workers_memory_bytes", "Memory usage by JOBS workers.", nil, nil)
+ pushOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_ok"), "Number of job push.", nil, nil)
+ pushErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_err"), "Number of jobs push which was failed.", nil, nil)
+ jobsErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_err"), "Number of jobs error while processing in the worker.", nil, nil)
+ jobsOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_ok"), "Number of successfully processed jobs.", nil, nil)
+)
+
+func newStatsExporter(stats informer.Informer) *statsExporter {
+ return &statsExporter{
+ workers: stats,
+ workersMemory: 0,
+ jobsOk: 0,
+ pushOk: 0,
+ jobsErr: 0,
+ pushErr: 0,
+ }
+}
+
+func (se *statsExporter) metricsCallback(event interface{}) {
+ if jev, ok := event.(events.JobEvent); ok {
+ switch jev.Event { //nolint:exhaustive
+ case events.EventJobOK:
+ atomic.AddUint64(&se.jobsOk, 1)
+ case events.EventPushOK:
+ atomic.AddUint64(&se.pushOk, 1)
+ case events.EventPushError:
+ atomic.AddUint64(&se.pushErr, 1)
+ case events.EventJobError:
+ atomic.AddUint64(&se.jobsErr, 1)
+ }
+ }
+}
+
+func (se *statsExporter) Describe(d chan<- *prometheus.Desc) {
+ // send description
+ d <- pushErr
+ d <- pushOk
+ d <- jobsErr
+ d <- jobsOk
+}
+
+func (se *statsExporter) Collect(ch chan<- prometheus.Metric) {
+ // get the copy of the processes
+ workers := se.workers.Workers()
+
+ // cumulative RSS memory in bytes
+ var cum uint64
+
+ // collect the memory
+ for i := 0; i < len(workers); i++ {
+ cum += workers[i].MemoryUsage
+ }
+
+ // send the values to the prometheus
+ ch <- prometheus.MustNewConstMetric(worker, prometheus.GaugeValue, float64(cum))
+ // send the values to the prometheus
+ ch <- prometheus.MustNewConstMetric(jobsOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsOk)))
+ ch <- prometheus.MustNewConstMetric(jobsErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsErr)))
+ ch <- prometheus.MustNewConstMetric(pushOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushOk)))
+ ch <- prometheus.MustNewConstMetric(pushErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushErr)))
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 7707cb8a..5e62c5c5 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -13,6 +13,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/pool"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -57,7 +59,8 @@ type Plugin struct {
stopCh chan struct{}
// internal payloads pool
- pldPool sync.Pool
+ pldPool sync.Pool
+ statsExporter *statsExporter
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
@@ -103,6 +106,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize)
p.log = log
+ // metrics
+ p.statsExporter = newStatsExporter(p)
+ p.events.AddListener(p.statsExporter.metricsCallback)
+
return nil
}
@@ -200,8 +207,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
5. Pipeline name
*/
+ start := time.Now()
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobStart,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: 0,
+ })
+
ctx, err := jb.Context()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
+
errNack := jb.Nack()
if errNack != nil {
p.log.Error("negatively acknowledge failed", "error", errNack)
@@ -218,6 +240,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
resp, err := p.workersPool.Exec(exec)
p.RUnlock()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
// RR protocol level error, Nack the job
errNack := jb.Nack()
if errNack != nil {
@@ -235,14 +263,33 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.putPayload(exec)
err = jb.Ack()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
p.log.Error("acknowledge error, job might be missed", "error", err)
continue
}
+
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobOK,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
}
// handle the response protocol
err = handleResponse(resp.Body, jb, p.log)
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
p.putPayload(exec)
errNack := jb.Nack()
if errNack != nil {
@@ -254,6 +301,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobOK,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
// return payload
p.putPayload(exec)
}
@@ -303,6 +356,44 @@ func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) {
p.jobConstructors[name.Name()] = c
}
+func (p *Plugin) Workers() []*process.State {
+ p.RLock()
+ wrk := p.workersPool.Workers()
+ p.RUnlock()
+
+ ps := make([]*process.State, len(wrk))
+
+ for i := 0; i < len(wrk); i++ {
+ st, err := process.WorkerProcessState(wrk[i])
+ if err != nil {
+ p.log.Error("jobs workers state", "error", err)
+ return nil
+ }
+
+ ps[i] = st
+ }
+
+ return ps
+}
+
+func (p *Plugin) JobsState(ctx context.Context) ([]*jobState.State, error) {
+ const op = errors.Op("jobs_plugin_drivers_state")
+ jst := make([]*jobState.State, 0, len(p.consumers))
+ for k := range p.consumers {
+ d := p.consumers[k]
+ newCtx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(p.cfg.Timeout))
+ state, err := d.State(newCtx)
+ if err != nil {
+ cancel()
+ return nil, errors.E(op, err)
+ }
+
+ jst = append(jst, state)
+ cancel()
+ }
+ return jst, nil
+}
+
func (p *Plugin) Available() {}
func (p *Plugin) Name() string {
@@ -319,7 +410,7 @@ func (p *Plugin) Reset() error {
p.workersPool = nil
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents)
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback)
if err != nil {
return errors.E(op, err)
}
@@ -332,6 +423,7 @@ func (p *Plugin) Reset() error {
func (p *Plugin) Push(j *job.Job) error {
const op = errors.Op("jobs_plugin_push")
+ start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j.Options.Pipeline)
if !ok {
@@ -357,11 +449,27 @@ func (p *Plugin) Push(j *job.Job) error {
err := d.Push(ctx, j)
if err != nil {
- cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushError,
+ ID: j.Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return errors.E(op, err)
}
- cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushOK,
+ ID: j.Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return nil
}
@@ -370,6 +478,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
for i := 0; i < len(j); i++ {
+ start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
if !ok {
@@ -392,6 +501,15 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
err := d.Push(ctx, j[i])
if err != nil {
cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushError,
+ ID: j[i].Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return errors.E(op, err)
}
@@ -536,15 +654,15 @@ func (p *Plugin) collectJobsEvents(event interface{}) {
case events.EventPipePaused:
p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobStart:
- p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobOK:
- p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPushOK:
p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPushError:
- p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobError:
- p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPipeActive:
p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPipeStopped:
diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md
index c89877e3..e195c407 100644
--- a/plugins/jobs/response_protocol.md
+++ b/plugins/jobs/response_protocol.md
@@ -15,7 +15,7 @@ Types are:
- `NO_ERROR`: contains only `type` and empty `data`.
- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the
job,
- `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
+ `delay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
with string key and array of strings as a value.
For example:
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 7f9859fb..94f903d5 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -1,6 +1,8 @@
package jobs
import (
+ "context"
+
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
@@ -14,7 +16,7 @@ type rpc struct {
}
func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("jobs_rpc_push")
+ const op = errors.Op("rpc_push")
// convert transport entity into domain
// how we can do this quickly
@@ -32,7 +34,7 @@ func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
}
func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("jobs_rpc_push")
+ const op = errors.Op("rpc_push_batch")
l := len(j.GetJobs())
@@ -79,7 +81,7 @@ func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error {
// 2. Pipeline name
// 3. Options related to the particular pipeline
func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error {
- const op = errors.Op("rcp_declare_pipeline")
+ const op = errors.Op("rpc_declare_pipeline")
pipe := &pipeline.Pipeline{}
for i := range req.GetPipeline() {
@@ -95,7 +97,7 @@ func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error
}
func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error {
- const op = errors.Op("rcp_declare_pipeline")
+ const op = errors.Op("rpc_declare_pipeline")
var destroyed []string //nolint:prealloc
for i := 0; i < len(req.GetPipelines()); i++ {
@@ -112,6 +114,28 @@ func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) err
return nil
}
+func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
+ const op = errors.Op("rpc_stats")
+ state, err := r.p.JobsState(context.Background())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ for i := 0; i < len(state); i++ {
+ resp.Stats = append(resp.Stats, &jobsv1beta.Stat{
+ Pipeline: state[i].Pipeline,
+ Driver: state[i].Driver,
+ Queue: state[i].Queue,
+ Active: state[i].Active,
+ Delayed: state[i].Delayed,
+ Reserved: state[i].Reserved,
+ Ready: state[i].Ready,
+ })
+ }
+
+ return nil
+}
+
// from converts from transport entity to domain
func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
headers := map[string][]string{}
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index c79f3eb0..fd30eb54 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -1,8 +1,10 @@
package memory
import (
+ "context"
"sync"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/bst"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -65,21 +67,25 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *PubSubDriver) Next() (*pubsub.Message, error) {
- msg := <-p.pushCh
- if msg == nil {
- return nil, nil
- }
-
- p.RLock()
- defer p.RUnlock()
+func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
+ const op = errors.Op("pubsub_memory")
+ select {
+ case msg := <-p.pushCh:
+ if msg == nil {
+ return nil, nil
+ }
- // push only messages, which topics are subscibed
- // TODO better???
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(msg.Topic); ok {
- return msg, nil
+ p.RLock()
+ defer p.RUnlock()
+ // push only messages, which topics are subscibed
+ // TODO better???
+ // if we have active subscribers - send a message to a topic
+ // or send nil instead
+ if ok := p.storage.Contains(msg.Topic); ok {
+ return msg, nil
+ }
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
}
return nil, nil
diff --git a/plugins/redis/jobs/config.go b/plugins/redis/jobs/config.go
new file mode 100644
index 00000000..89d707af
--- /dev/null
+++ b/plugins/redis/jobs/config.go
@@ -0,0 +1,34 @@
+package jobs
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/jobs/consumer.go b/plugins/redis/jobs/consumer.go
new file mode 100644
index 00000000..415ac457
--- /dev/null
+++ b/plugins/redis/jobs/consumer.go
@@ -0,0 +1 @@
+package jobs
diff --git a/plugins/redis/jobs/item.go b/plugins/redis/jobs/item.go
new file mode 100644
index 00000000..415ac457
--- /dev/null
+++ b/plugins/redis/jobs/item.go
@@ -0,0 +1 @@
+package jobs
diff --git a/plugins/redis/kv/config.go b/plugins/redis/kv/config.go
new file mode 100644
index 00000000..5b760952
--- /dev/null
+++ b/plugins/redis/kv/config.go
@@ -0,0 +1,34 @@
+package kv
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/kv.go b/plugins/redis/kv/kv.go
index 29f89d46..b41cb86c 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv/kv.go
@@ -1,4 +1,4 @@
-package redis
+package kv
import (
"context"
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 3c62a63f..961182a9 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -9,6 +9,8 @@ import (
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ redis_kv "github.com/spiral/roadrunner/v2/plugins/redis/kv"
+ redis_pubsub "github.com/spiral/roadrunner/v2/plugins/redis/pubsub"
)
const PluginName = "redis"
@@ -62,7 +64,7 @@ func (p *Plugin) Available() {}
// KVConstruct provides KV storage implementation over the redis plugin
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("redis_plugin_provide")
- st, err := NewRedisDriver(p.log, key, p.cfgPlugin)
+ st, err := redis_kv.NewRedisDriver(p.log, key, p.cfgPlugin)
if err != nil {
return nil, errors.E(op, err)
}
@@ -71,5 +73,5 @@ func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
}
func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
- return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
+ return redis_pubsub.NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
}
diff --git a/plugins/redis/channel.go b/plugins/redis/pubsub/channel.go
index 0cd62d19..a1655ab2 100644
--- a/plugins/redis/channel.go
+++ b/plugins/redis/pubsub/channel.go
@@ -1,4 +1,4 @@
-package redis
+package pubsub
import (
"context"
@@ -92,6 +92,6 @@ func (r *redisChannel) stop() error {
return nil
}
-func (r *redisChannel) message() *pubsub.Message {
- return <-r.out
+func (r *redisChannel) message() chan *pubsub.Message {
+ return r.out
}
diff --git a/plugins/redis/pubsub/config.go b/plugins/redis/pubsub/config.go
new file mode 100644
index 00000000..bf8d2fc9
--- /dev/null
+++ b/plugins/redis/pubsub/config.go
@@ -0,0 +1,34 @@
+package pubsub
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub/pubsub.go
index 01efc623..c9ad3d58 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub/pubsub.go
@@ -1,4 +1,4 @@
-package redis
+package pubsub
import (
"context"
@@ -172,6 +172,12 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *PubSubDriver) Next() (*pubsub.Message, error) {
- return p.channel.message(), nil
+func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
+ const op = errors.Op("redis_driver_next")
+ select {
+ case msg := <-p.channel.message():
+ return msg, nil
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
}
diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go
index d3271d6c..a9a5a63c 100644
--- a/plugins/reload/plugin.go
+++ b/plugins/reload/plugin.go
@@ -20,12 +20,12 @@ type Plugin struct {
log logger.Logger
watcher *Watcher
services map[string]interface{}
- res resetter.Resetter
+ res *resetter.Plugin
stopc chan struct{}
}
// Init controller service
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res resetter.Resetter) error {
+func (s *Plugin) Init(cfg config.Configurer, log logger.Logger, res *resetter.Plugin) error {
const op = errors.Op("reload_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -86,9 +86,9 @@ func (s *Plugin) Serve() chan error {
}
// make a map with unique services
- // so, if we would have a 100 events from http service
- // in map we would see only 1 key and it's config
- treshholdc := make(chan struct {
+ // so, if we would have 100 events from http service
+ // in map we would see only 1 key, and it's config
+ thCh := make(chan struct {
serviceConfig ServiceConfig
service string
}, thresholdChanBuffer)
@@ -98,7 +98,7 @@ func (s *Plugin) Serve() chan error {
go func() {
for e := range s.watcher.Event {
- treshholdc <- struct {
+ thCh <- struct {
serviceConfig ServiceConfig
service string
}{serviceConfig: s.cfg.Services[e.service], service: e.service}
@@ -111,7 +111,7 @@ func (s *Plugin) Serve() chan error {
go func() {
for {
select {
- case cfg := <-treshholdc:
+ case cfg := <-thCh:
// logic is following:
// restart
timer.Stop()
@@ -124,7 +124,7 @@ func (s *Plugin) Serve() chan error {
case <-timer.C:
if len(updated) > 0 {
for name := range updated {
- err := s.res.ResetByName(name)
+ err := s.res.Reset(name)
if err != nil {
timer.Stop()
errCh <- errors.E(op, err)
diff --git a/plugins/resetter/interface.go b/plugins/resetter/interface.go
index 47d8d791..0defcaba 100644
--- a/plugins/resetter/interface.go
+++ b/plugins/resetter/interface.go
@@ -1,17 +1,7 @@
package resetter
-// If plugin implements Resettable interface, than it state can be resetted without reload in runtime via RPC/HTTP
-type Resettable interface {
- // Reset reload all plugins
- Reset() error
-}
-
-// Resetter interface is the Resetter plugin main interface
+// Resetter interface
type Resetter interface {
- // Reset all registered plugins
- ResetAll() error
- // Reset by plugin name
- ResetByName(string) error
- // GetAll registered plugins
- GetAll() []string
+ // Reset reload plugin
+ Reset() error
}
diff --git a/plugins/resetter/plugin.go b/plugins/resetter/plugin.go
index 4feb692a..b2fe59af 100644
--- a/plugins/resetter/plugin.go
+++ b/plugins/resetter/plugin.go
@@ -3,61 +3,32 @@ package resetter
import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
)
const PluginName = "resetter"
type Plugin struct {
- registry map[string]Resettable
- log logger.Logger
+ registry map[string]Resetter
}
-func (p *Plugin) ResetAll() error {
- const op = errors.Op("resetter_plugin_reset_all")
- for name := range p.registry {
- err := p.registry[name].Reset()
- if err != nil {
- return errors.E(op, err)
- }
- }
- return nil
-}
-
-func (p *Plugin) ResetByName(plugin string) error {
- const op = errors.Op("resetter_plugin_reset_by_name")
- if plugin, ok := p.registry[plugin]; ok {
- return plugin.Reset()
- }
- return errors.E(op, errors.Errorf("can't find plugin: %s", plugin))
-}
-
-func (p *Plugin) GetAll() []string {
- all := make([]string, 0, len(p.registry))
- for name := range p.registry {
- all = append(all, name)
- }
- return all
-}
-
-func (p *Plugin) Init(log logger.Logger) error {
- p.registry = make(map[string]Resettable)
- p.log = log
+func (p *Plugin) Init() error {
+ p.registry = make(map[string]Resetter)
return nil
}
// Reset named service.
func (p *Plugin) Reset(name string) error {
+ const op = errors.Op("resetter_plugin_reset_by_name")
svc, ok := p.registry[name]
if !ok {
- return errors.E("no such service", errors.Str(name))
+ return errors.E(op, errors.Errorf("no such service: %s", name))
}
return svc.Reset()
}
// RegisterTarget resettable service.
-func (p *Plugin) RegisterTarget(name endure.Named, r Resettable) error {
+func (p *Plugin) RegisterTarget(name endure.Named, r Resetter) error {
p.registry[name.Name()] = r
return nil
}
@@ -80,5 +51,5 @@ func (p *Plugin) Available() {
// RPC returns associated rpc service.
func (p *Plugin) RPC() interface{} {
- return &rpc{srv: p, log: p.log}
+ return &rpc{srv: p}
}
diff --git a/plugins/resetter/rpc.go b/plugins/resetter/rpc.go
index 69c955b0..79171b5c 100644
--- a/plugins/resetter/rpc.go
+++ b/plugins/resetter/rpc.go
@@ -1,30 +1,29 @@
package resetter
-import "github.com/spiral/roadrunner/v2/plugins/logger"
+import "github.com/spiral/errors"
type rpc struct {
srv *Plugin
- log logger.Logger
}
// List all resettable plugins.
func (rpc *rpc) List(_ bool, list *[]string) error {
- rpc.log.Debug("started List method")
*list = make([]string, 0)
for name := range rpc.srv.registry {
*list = append(*list, name)
}
- rpc.log.Debug("services list", "services", *list)
-
- rpc.log.Debug("finished List method")
return nil
}
// Reset named plugin.
func (rpc *rpc) Reset(service string, done *bool) error {
- rpc.log.Debug("started Reset method for the service", "service", service)
- defer rpc.log.Debug("finished Reset method for the service", "service", service)
+ const op = errors.Op("resetter_rpc_reset")
+ err := rpc.srv.Reset(service)
+ if err != nil {
+ *done = false
+ return errors.E(op, err)
+ }
*done = true
- return rpc.srv.Reset(service)
+ return nil
}
diff --git a/plugins/server/command.go b/plugins/server/command.go
index e0b61896..b8bc1395 100644
--- a/plugins/server/command.go
+++ b/plugins/server/command.go
@@ -29,5 +29,5 @@ func (server *Plugin) scanCommand(cmd []string) error {
return nil
}
}
- return errors.E(errors.Str("scan failed, possible path not found"), op)
+ return errors.E(errors.Str("scan failed, possible path not found, this is not an error"), op)
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 1694cdf1..16e3bd8c 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -97,7 +97,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
// try to find a path here
err := server.scanCommand(cmdArgs)
if err != nil {
- server.log.Info("scan command", "error", err)
+ server.log.Info("scan command", "reason", err)
}
return func() *exec.Cmd {
diff --git a/plugins/service/plugin.go b/plugins/service/plugin.go
index 28b84443..3bd0f956 100644
--- a/plugins/service/plugin.go
+++ b/plugins/service/plugin.go
@@ -4,7 +4,7 @@ import (
"sync"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 2df23f11..395b056f 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -13,7 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -58,6 +58,10 @@ type Plugin struct {
// server which produces commands to the pool
server server.Server
+ // stop receiving messages
+ cancel context.CancelFunc
+ ctx context.Context
+
// function used to validate access to the requested resource
accessValidator validator.AccessValidatorFn
}
@@ -90,6 +94,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.server = server
p.log = log
p.broadcaster = b
+
+ ctx, cancel := context.WithCancel(context.Background())
+ p.ctx = ctx
+ p.cancel = cancel
return nil
}
@@ -118,7 +126,8 @@ func (p *Plugin) Serve() chan error {
Supervisor: p.cfg.Pool.Supervisor,
}, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path})
if err != nil {
- errCh <- err
+ errCh <- errors.E(op, err)
+ return
}
p.accessValidator = p.defaultAccessValidator(p.phpPool)
@@ -129,17 +138,17 @@ func (p *Plugin) Serve() chan error {
// we need here only Reader part of the interface
go func(ps pubsub.Reader) {
for {
- select {
- case <-p.serveExit:
- return
- default:
- data, err := ps.Next()
- if err != nil {
- errCh <- err
+ data, err := ps.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
- p.workersPool.Queue(data)
+
+ errCh <- errors.E(op, err)
+ return
}
+
+ p.workersPool.Queue(data)
}
}(p.subReader)
@@ -149,6 +158,8 @@ func (p *Plugin) Serve() chan error {
func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
+ // cancel context
+ p.cancel()
p.Lock()
if p.phpPool == nil {
p.Unlock()
diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go
index 6a6f59af..bd8e3b43 100644
--- a/proto/jobs/v1beta/jobs.pb.go
+++ b/proto/jobs/v1beta/jobs.pb.go
@@ -440,6 +440,149 @@ func (x *HeaderValue) GetValue() []string {
return nil
}
+type Stats struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Stats []*Stat `protobuf:"bytes,1,rep,name=Stats,proto3" json:"Stats,omitempty"`
+}
+
+func (x *Stats) Reset() {
+ *x = Stats{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[8]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Stats) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Stats) ProtoMessage() {}
+
+func (x *Stats) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[8]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Stats.ProtoReflect.Descriptor instead.
+func (*Stats) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{8}
+}
+
+func (x *Stats) GetStats() []*Stat {
+ if x != nil {
+ return x.Stats
+ }
+ return nil
+}
+
+// Stats used as a response for the Stats RPC call
+type Stat struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Pipeline string `protobuf:"bytes,1,opt,name=pipeline,proto3" json:"pipeline,omitempty"`
+ Driver string `protobuf:"bytes,2,opt,name=driver,proto3" json:"driver,omitempty"`
+ Queue string `protobuf:"bytes,3,opt,name=queue,proto3" json:"queue,omitempty"`
+ Active int64 `protobuf:"varint,4,opt,name=active,proto3" json:"active,omitempty"`
+ Delayed int64 `protobuf:"varint,5,opt,name=delayed,proto3" json:"delayed,omitempty"`
+ Reserved int64 `protobuf:"varint,6,opt,name=reserved,proto3" json:"reserved,omitempty"`
+ Ready bool `protobuf:"varint,7,opt,name=ready,proto3" json:"ready,omitempty"`
+}
+
+func (x *Stat) Reset() {
+ *x = Stat{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[9]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Stat) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Stat) ProtoMessage() {}
+
+func (x *Stat) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[9]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Stat.ProtoReflect.Descriptor instead.
+func (*Stat) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{9}
+}
+
+func (x *Stat) GetPipeline() string {
+ if x != nil {
+ return x.Pipeline
+ }
+ return ""
+}
+
+func (x *Stat) GetDriver() string {
+ if x != nil {
+ return x.Driver
+ }
+ return ""
+}
+
+func (x *Stat) GetQueue() string {
+ if x != nil {
+ return x.Queue
+ }
+ return ""
+}
+
+func (x *Stat) GetActive() int64 {
+ if x != nil {
+ return x.Active
+ }
+ return 0
+}
+
+func (x *Stat) GetDelayed() int64 {
+ if x != nil {
+ return x.Delayed
+ }
+ return 0
+}
+
+func (x *Stat) GetReserved() int64 {
+ if x != nil {
+ return x.Reserved
+ }
+ return 0
+}
+
+func (x *Stat) GetReady() bool {
+ if x != nil {
+ return x.Ready
+ }
+ return false
+}
+
var File_jobs_proto protoreflect.FileDescriptor
var file_jobs_proto_rawDesc = []byte{
@@ -488,8 +631,23 @@ var file_jobs_proto_rawDesc = []byte{
0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x22, 0x23, 0x0a,
0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c,
- 0x75, 0x65, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62,
- 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x75, 0x65, 0x22, 0x30, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x27, 0x0a, 0x05, 0x53,
+ 0x74, 0x61, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6a, 0x6f, 0x62,
+ 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x52, 0x05, 0x53,
+ 0x74, 0x61, 0x74, 0x73, 0x22, 0xb4, 0x01, 0x0a, 0x04, 0x53, 0x74, 0x61, 0x74, 0x12, 0x1a, 0x0a,
+ 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
+ 0x08, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x72, 0x69,
+ 0x76, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x64, 0x72, 0x69, 0x76, 0x65,
+ 0x72, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x76,
+ 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x12,
+ 0x18, 0x0a, 0x07, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03,
+ 0x52, 0x07, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73,
+ 0x65, 0x72, 0x76, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x72, 0x65, 0x73,
+ 0x65, 0x72, 0x76, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x07,
+ 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x42, 0x0f, 0x5a, 0x0d, 0x2e,
+ 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -504,7 +662,7 @@ func file_jobs_proto_rawDescGZIP() []byte {
return file_jobs_proto_rawDescData
}
-var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
+var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
var file_jobs_proto_goTypes = []interface{}{
(*PushRequest)(nil), // 0: jobs.v1beta.PushRequest
(*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest
@@ -514,21 +672,24 @@ var file_jobs_proto_goTypes = []interface{}{
(*Job)(nil), // 5: jobs.v1beta.Job
(*Options)(nil), // 6: jobs.v1beta.Options
(*HeaderValue)(nil), // 7: jobs.v1beta.HeaderValue
- nil, // 8: jobs.v1beta.DeclareRequest.PipelineEntry
- nil, // 9: jobs.v1beta.Job.HeadersEntry
+ (*Stats)(nil), // 8: jobs.v1beta.Stats
+ (*Stat)(nil), // 9: jobs.v1beta.Stat
+ nil, // 10: jobs.v1beta.DeclareRequest.PipelineEntry
+ nil, // 11: jobs.v1beta.Job.HeadersEntry
}
var file_jobs_proto_depIdxs = []int32{
- 5, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job
- 5, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job
- 8, // 2: jobs.v1beta.DeclareRequest.pipeline:type_name -> jobs.v1beta.DeclareRequest.PipelineEntry
- 9, // 3: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry
- 6, // 4: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options
- 7, // 5: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue
- 6, // [6:6] is the sub-list for method output_type
- 6, // [6:6] is the sub-list for method input_type
- 6, // [6:6] is the sub-list for extension type_name
- 6, // [6:6] is the sub-list for extension extendee
- 0, // [0:6] is the sub-list for field type_name
+ 5, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job
+ 5, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job
+ 10, // 2: jobs.v1beta.DeclareRequest.pipeline:type_name -> jobs.v1beta.DeclareRequest.PipelineEntry
+ 11, // 3: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry
+ 6, // 4: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options
+ 9, // 5: jobs.v1beta.Stats.Stats:type_name -> jobs.v1beta.Stat
+ 7, // 6: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue
+ 7, // [7:7] is the sub-list for method output_type
+ 7, // [7:7] is the sub-list for method input_type
+ 7, // [7:7] is the sub-list for extension type_name
+ 7, // [7:7] is the sub-list for extension extendee
+ 0, // [0:7] is the sub-list for field type_name
}
func init() { file_jobs_proto_init() }
@@ -633,6 +794,30 @@ func file_jobs_proto_init() {
return nil
}
}
+ file_jobs_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Stats); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Stat); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -640,7 +825,7 @@ func file_jobs_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_jobs_proto_rawDesc,
NumEnums: 0,
- NumMessages: 10,
+ NumMessages: 12,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto
index 68d2ed97..c030c0df 100644
--- a/proto/jobs/v1beta/jobs.proto
+++ b/proto/jobs/v1beta/jobs.proto
@@ -43,3 +43,18 @@ message Options {
message HeaderValue {
repeated string value = 1;
}
+
+message Stats {
+ repeated Stat Stats = 1;
+}
+
+// Stat used as a response for the Stats RPC call
+message Stat {
+ string pipeline = 1;
+ string driver = 2;
+ string queue = 3;
+ int64 active = 4;
+ int64 delayed = 5;
+ int64 reserved = 6;
+ bool ready = 7;
+}
diff --git a/tests/env/Dockerfile-beanstalkd.yaml b/tests/env/Dockerfile-beanstalkd.yaml
index 852385a1..7b36f8d3 100644
--- a/tests/env/Dockerfile-beanstalkd.yaml
+++ b/tests/env/Dockerfile-beanstalkd.yaml
@@ -1,15 +1,13 @@
-FROM archlinux:latest
+FROM ubuntu:latest
ARG DEBIAN_FRONTEND=noninteractive
-RUN pacman-key --init
-RUN Y | pacman -Syu --noconfirm
-RUN Y | pacman -S --noconfirm curl base-devel pkgconf
+RUN apt-get update && apt-get install -y curl build-essential pkg-config
RUN curl -sL https://github.com/kr/beanstalkd/archive/v1.12.tar.gz | tar xvz -C /tmp
WORKDIR /tmp/beanstalkd-1.12
-RUN make -j12
+RUN make
RUN cp beanstalkd /usr/bin
EXPOSE 11300
diff --git a/tests/env/docker-compose.yaml b/tests/env/docker-compose.yaml
index 4f58d543..d93bc5af 100644
--- a/tests/env/docker-compose.yaml
+++ b/tests/env/docker-compose.yaml
@@ -1,4 +1,4 @@
-version: '3.8'
+version: '3'
services:
memcached:
@@ -37,3 +37,8 @@ services:
ports:
- "127.0.0.1:15672:15672"
- "127.0.0.1:5672:5672"
+
+ prometheus:
+ image: prom/prometheus
+ ports:
+ - "9090:9090"
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
index a78b17e1..3dcc5c2c 100644
--- a/tests/plugins/broadcast/broadcast_plugin_test.go
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -1,6 +1,7 @@
package broadcast
import (
+ "context"
"net"
"net/rpc"
"os"
@@ -10,6 +11,7 @@ import (
"testing"
"time"
+ goRedis "github.com/go-redis/redis/v8"
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
@@ -176,6 +178,9 @@ func TestBroadcastNoConfig(t *testing.T) {
}
func TestBroadcastSameSubscriber(t *testing.T) {
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
@@ -274,8 +279,11 @@ func TestBroadcastSameSubscriber(t *testing.T) {
time.Sleep(time.Second * 2)
t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6002"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6002"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002"))
+ time.Sleep(time.Second)
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002"))
time.Sleep(time.Second * 5)
@@ -283,9 +291,17 @@ func TestBroadcastSameSubscriber(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()
+
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
+ time.Sleep(time.Second * 5)
}
func TestBroadcastSameSubscriberGlobal(t *testing.T) {
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
+
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
@@ -384,8 +400,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
time.Sleep(time.Second * 2)
t.Run("PublishHelloFooFoo2Foo3", BroadcastPublishFooFoo2Foo3("6003"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo2", BroadcastPublishFoo2("6003"))
+ time.Sleep(time.Second)
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003"))
+ time.Sleep(time.Second)
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003"))
time.Sleep(time.Second * 4)
@@ -393,7 +412,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()
+
time.Sleep(time.Second * 5)
+
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6379"))
+ t.Run("RedisFlush", redisFlushAll("127.0.0.1:6378"))
}
func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) {
@@ -446,6 +469,7 @@ func BroadcastPublishFoo3(port string) func(t *testing.T) {
}
}
}
+
func BroadcastPublishAsyncFooFoo2Foo3(port string) func(t *testing.T) {
return func(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:"+port)
@@ -475,3 +499,15 @@ func makeMessage(payload []byte, topics ...string) *websocketsv1.Request {
return m
}
+
+func redisFlushAll(addr string) func(t *testing.T) {
+ return func(t *testing.T) {
+ rdb := goRedis.NewClient(&goRedis.Options{
+ Addr: addr,
+ Password: "", // no password set
+ DB: 0, // use default DB
+ })
+
+ rdb.FlushAll(context.Background())
+ }
+}
diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go
index 01ad1479..ed5139a8 100644
--- a/tests/plugins/broadcast/plugins/plugin1.go
+++ b/tests/plugins/broadcast/plugins/plugin1.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,14 +16,14 @@ type Plugin1 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +44,16 @@ func (p *Plugin1) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
-
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
+ errCh <- err
+ return
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
}
}()
@@ -68,8 +64,7 @@ func (p *Plugin1) Stop() error {
_ = p.driver.Unsubscribe("1", "foo")
_ = p.driver.Unsubscribe("1", "foo2")
_ = p.driver.Unsubscribe("1", "foo3")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go
index ee072ffe..20cc1b24 100644
--- a/tests/plugins/broadcast/plugins/plugin2.go
+++ b/tests/plugins/broadcast/plugins/plugin2.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,13 +16,14 @@ type Plugin2 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -40,22 +43,20 @@ func (p *Plugin2) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
}
}()
@@ -64,7 +65,7 @@ func (p *Plugin2) Serve() chan error {
func (p *Plugin2) Stop() error {
_ = p.driver.Unsubscribe("2", "foo")
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go
index 288201d1..2f416d2e 100644
--- a/tests/plugins/broadcast/plugins/plugin3.go
+++ b/tests/plugins/broadcast/plugins/plugin3.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin3 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin3) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
}
}()
@@ -66,7 +65,7 @@ func (p *Plugin3) Serve() chan error {
func (p *Plugin3) Stop() error {
_ = p.driver.Unsubscribe("3", "foo")
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go
index 56f79c0f..e2209648 100644
--- a/tests/plugins/broadcast/plugins/plugin4.go
+++ b/tests/plugins/broadcast/plugins/plugin4.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin4 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin4) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
}
}()
@@ -66,8 +65,7 @@ func (p *Plugin4) Serve() chan error {
func (p *Plugin4) Stop() error {
_ = p.driver.Unsubscribe("4", "foo")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go
index e7cd7e60..122046b8 100644
--- a/tests/plugins/broadcast/plugins/plugin5.go
+++ b/tests/plugins/broadcast/plugins/plugin5.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin5 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin5) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
}
}()
@@ -66,8 +65,7 @@ func (p *Plugin5) Serve() chan error {
func (p *Plugin5) Stop() error {
_ = p.driver.Unsubscribe("5", "foo")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go
index 08272196..6ace0a79 100644
--- a/tests/plugins/broadcast/plugins/plugin6.go
+++ b/tests/plugins/broadcast/plugins/plugin6.go
@@ -1,8 +1,10 @@
package plugins
import (
+ "context"
"fmt"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -14,15 +16,14 @@ type Plugin6 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
-
- exit chan struct{}
+ ctx context.Context
+ cancel context.CancelFunc
}
func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
-
- p.exit = make(chan struct{}, 1)
+ p.ctx, p.cancel = context.WithCancel(context.Background())
return nil
}
@@ -42,22 +43,20 @@ func (p *Plugin6) Serve() chan error {
go func() {
for {
- select {
- case <-p.exit:
- return
- default:
- msg, err := p.driver.Next()
- if err != nil {
- errCh <- err
+ msg, err := p.driver.Next(p.ctx)
+ if err != nil {
+ if errors.Is(errors.TimeOut, err) {
return
}
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
-
- p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
+ if msg == nil {
+ continue
}
+
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
}
}()
@@ -66,8 +65,7 @@ func (p *Plugin6) Serve() chan error {
func (p *Plugin6) Stop() error {
_ = p.driver.Unsubscribe("6", "foo")
-
- p.exit <- struct{}{}
+ p.cancel()
return nil
}
diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go
index bd804264..a48c8972 100644
--- a/tests/plugins/http/http_plugin_test.go
+++ b/tests/plugins/http/http_plugin_test.go
@@ -22,7 +22,7 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/gzip"
"github.com/spiral/roadrunner/v2/plugins/informer"
diff --git a/tests/plugins/informer/informer_test.go b/tests/plugins/informer/informer_test.go
index 61be85a1..c3b5c6a6 100644
--- a/tests/plugins/informer/informer_test.go
+++ b/tests/plugins/informer/informer_test.go
@@ -12,7 +12,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/logger"
diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go
index 095140b8..21897f40 100644
--- a/tests/plugins/informer/test_plugin.go
+++ b/tests/plugins/informer/test_plugin.go
@@ -5,7 +5,7 @@ import (
"time"
"github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/state/process"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/server"
)
diff --git a/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml b/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml
new file mode 100644
index 00000000..4db9a676
--- /dev/null
+++ b/tests/plugins/jobs/configs/.rr-jobs-metrics.yaml
@@ -0,0 +1,27 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+metrics:
+ address: 127.0.0.1:2112
+
+logs:
+ level: info
+ encoding: console
+ mode: development
+
+jobs:
+ # num logical cores by default
+ num_pollers: 10
+ # 1mi by default
+ pipeline_size: 100000
+ # worker pool configuration
+ pool:
+ num_workers: 1
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go
index a268ebb8..5067ef9f 100644
--- a/tests/plugins/jobs/helpers.go
+++ b/tests/plugins/jobs/helpers.go
@@ -8,6 +8,7 @@ import (
"testing"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -18,6 +19,7 @@ const (
pause string = "jobs.Pause"
destroy string = "jobs.Destroy"
resume string = "jobs.Resume"
+ stat string = "jobs.Stat"
)
func resumePipes(pipes ...string) func(t *testing.T) {
@@ -57,7 +59,7 @@ func pushToDisabledPipe(pipeline string) func(t *testing.T) {
er := &jobsv1beta.Empty{}
err = client.Call(push, req, er)
- assert.Error(t, err)
+ assert.NoError(t, err)
}
}
@@ -85,6 +87,30 @@ func pushToPipe(pipeline string) func(t *testing.T) {
}
}
+func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{
+ Job: "some/php/namespace",
+ Id: "1",
+ Payload: `{"hello":"world"}`,
+ Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}},
+ Options: &jobsv1beta.Options{
+ Priority: 1,
+ Pipeline: pipeline,
+ Delay: delay,
+ },
+ }}
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call(push, req, er)
+ assert.NoError(t, err)
+ }
+}
+
func pushToPipeErr(pipeline string) func(t *testing.T) {
return func(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
@@ -183,3 +209,26 @@ func deleteProxy(name string, t *testing.T) {
_ = resp.Body.Close()
}
}
+
+func stats(state *jobState.State) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ st := &jobsv1beta.Stats{}
+ er := &jobsv1beta.Empty{}
+
+ err = client.Call(stat, er, st)
+ require.NoError(t, err)
+ require.NotNil(t, st)
+
+ state.Queue = st.Stats[0].Queue
+ state.Pipeline = st.Stats[0].Pipeline
+ state.Driver = st.Stats[0].Driver
+ state.Active = st.Stats[0].Active
+ state.Delayed = st.Stats[0].Delayed
+ state.Reserved = st.Stats[0].Reserved
+ state.Ready = st.Stats[0].Ready
+ }
+}
diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go
index 7096a467..df84fabc 100644
--- a/tests/plugins/jobs/jobs_amqp_test.go
+++ b/tests/plugins/jobs/jobs_amqp_test.go
@@ -13,6 +13,7 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
@@ -133,6 +134,10 @@ func TestAMQPDeclare(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -228,6 +233,10 @@ func TestAMQPJobsError(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
@@ -305,44 +314,67 @@ func TestAMQPJobsError(t *testing.T) {
wg.Wait()
}
-func declareAMQPPipe(t *testing.T) {
- conn, err := net.Dial("tcp", "127.0.0.1:6001")
+func TestAMQPNoGlobalSection(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{
- "driver": "amqp",
- "name": "test-3",
- "routing-key": "test-3",
- "queue": "default",
- "exchange-type": "direct",
- "exchange": "amqp.default",
- "prefetch": "100",
- "priority": "3",
- "exclusive": "true",
- "multiple_ask": "true",
- "requeue_on_fail": "true",
- }}
+ cfg := &config.Viper{
+ Path: "amqp/.rr-no-global.yaml",
+ Prefix: "rr",
+ }
- er := &jobsv1beta.Empty{}
- err = client.Call("jobs.Declare", pipe, er)
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &amqp.Plugin{},
+ )
assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = cont.Serve()
+ require.Error(t, err)
}
-func TestAMQPNoGlobalSection(t *testing.T) {
+func TestAMQPStats(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
cfg := &config.Viper{
- Path: "amqp/.rr-no-global.yaml",
+ Path: "amqp/.rr-amqp-declare.yaml",
Prefix: "rr",
}
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes()
+
err = cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
- &logger.ZapLogger{},
+ mockLogger,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
@@ -355,6 +387,113 @@ func TestAMQPNoGlobalSection(t *testing.T) {
t.Fatal(err)
}
- _, err = cont.Serve()
- require.Error(t, err)
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclareAMQPPipeline", declareAMQPPipe)
+ t.Run("ConsumeAMQPPipeline", resumePipes("test-3"))
+ t.Run("PushAMQPPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 2)
+ t.Run("PauseAMQPPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second * 2)
+ t.Run("PushAMQPPipeline", pushToPipe("test-3"))
+ t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5))
+
+ out := &jobState.State{}
+ t.Run("Stats", stats(out))
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "amqp")
+ assert.Equal(t, out.Queue, "default")
+
+ assert.Equal(t, int64(1), out.Active)
+ assert.Equal(t, int64(1), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+ assert.Equal(t, false, out.Ready)
+
+ time.Sleep(time.Second)
+ t.Run("ResumePipeline", resumePipes("test-3"))
+ time.Sleep(time.Second * 7)
+
+ out = &jobState.State{}
+ t.Run("Stats", stats(out))
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "amqp")
+ assert.Equal(t, out.Queue, "default")
+
+ assert.Equal(t, int64(0), out.Active)
+ assert.Equal(t, int64(0), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+ assert.Equal(t, true, out.Ready)
+
+ time.Sleep(time.Second)
+ t.Run("DestroyAMQPPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func declareAMQPPipe(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{
+ "driver": "amqp",
+ "name": "test-3",
+ "routing-key": "test-3",
+ "queue": "default",
+ "exchange-type": "direct",
+ "exchange": "amqp.default",
+ "prefetch": "100",
+ "priority": "3",
+ "exclusive": "true",
+ "multiple_ask": "true",
+ "requeue_on_fail": "true",
+ }}
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call("jobs.Declare", pipe, er)
+ assert.NoError(t, err)
}
diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go
index aebe9da1..31b80e35 100644
--- a/tests/plugins/jobs/jobs_beanstalk_test.go
+++ b/tests/plugins/jobs/jobs_beanstalk_test.go
@@ -13,6 +13,7 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
@@ -134,6 +135,10 @@ func TestBeanstalkDeclare(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes()
@@ -235,6 +240,10 @@ func TestBeanstalkJobsError(t *testing.T) {
mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes()
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes()
@@ -312,6 +321,138 @@ func TestBeanstalkJobsError(t *testing.T) {
wg.Wait()
}
+func TestBeanstalkStats(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "beanstalk/.rr-beanstalk-declare.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "beanstalk", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("beanstalk reserve timeout", "warn", "reserve-with-timeout").AnyTimes()
+
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("beanstalk listener stopped").AnyTimes()
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &beanstalk.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclarePipeline", declareBeanstalkPipe)
+ t.Run("ConsumePipeline", resumePipes("test-3"))
+ t.Run("PushPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 2)
+ t.Run("PausePipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second * 3)
+ t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 10))
+ t.Run("PushPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 5)
+
+ out := &jobState.State{}
+ t.Run("Stats", stats(out))
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "beanstalk")
+ assert.Equal(t, out.Queue, "default")
+
+ assert.Equal(t, int64(1), out.Active)
+ assert.Equal(t, int64(1), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+
+ time.Sleep(time.Second)
+ t.Run("ResumePipeline", resumePipes("test-3"))
+ time.Sleep(time.Second * 15)
+
+ out = &jobState.State{}
+ t.Run("Stats", stats(out))
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "beanstalk")
+ assert.Equal(t, out.Queue, "default")
+
+ assert.Equal(t, int64(0), out.Active)
+ assert.Equal(t, int64(0), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+
+ time.Sleep(time.Second)
+ t.Run("DestroyPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
func TestBeanstalkNoGlobalSection(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go
index 0a882556..162579c8 100644
--- a/tests/plugins/jobs/jobs_ephemeral_test.go
+++ b/tests/plugins/jobs/jobs_ephemeral_test.go
@@ -13,6 +13,7 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
@@ -129,6 +130,12 @@ func TestEphemeralDeclare(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
err = cont.RegisterAll(
@@ -201,8 +208,6 @@ func TestEphemeralDeclare(t *testing.T) {
time.Sleep(time.Second * 5)
stopCh <- struct{}{}
wg.Wait()
-
- time.Sleep(time.Second * 5)
}
func TestEphemeralPauseResume(t *testing.T) {
@@ -224,7 +229,13 @@ func TestEphemeralPauseResume(t *testing.T) {
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(3)
+
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-local", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -290,14 +301,15 @@ func TestEphemeralPauseResume(t *testing.T) {
time.Sleep(time.Second * 3)
+ t.Run("ephemeralResume", resumePipes("test-local"))
t.Run("ephemeralPause", pausePipelines("test-local"))
t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local"))
t.Run("ephemeralResume", resumePipes("test-local"))
t.Run("pushToEnabledPipe", pushToPipe("test-local"))
-
time.Sleep(time.Second * 1)
stopCh <- struct{}{}
+ time.Sleep(time.Second)
wg.Wait()
}
@@ -319,6 +331,12 @@ func TestEphemeralJobsError(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
@@ -394,6 +412,135 @@ func TestEphemeralJobsError(t *testing.T) {
wg.Wait()
}
+func TestEphemeralStats(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "ephemeral/.rr-ephemeral-declare.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "ephemeral", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &ephemeral.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
+ t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
+ t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second)
+ t.Run("PauseEphemeralPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second)
+
+ t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5))
+ t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+
+ time.Sleep(time.Second)
+ out := &jobState.State{}
+ t.Run("Stats", stats(out))
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "ephemeral")
+ assert.Equal(t, out.Queue, "test-3")
+
+ assert.Equal(t, out.Active, int64(1))
+ assert.Equal(t, out.Delayed, int64(1))
+ assert.Equal(t, out.Reserved, int64(0))
+
+ time.Sleep(time.Second)
+ t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
+ time.Sleep(time.Second * 7)
+
+ out = &jobState.State{}
+ t.Run("Stats", stats(out))
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "ephemeral")
+ assert.Equal(t, out.Queue, "test-3")
+
+ assert.Equal(t, out.Active, int64(0))
+ assert.Equal(t, out.Delayed, int64(0))
+ assert.Equal(t, out.Reserved, int64(0))
+
+ t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
func declareEphemeralPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go
index 22b80157..5d8d8d9c 100644
--- a/tests/plugins/jobs/jobs_general_test.go
+++ b/tests/plugins/jobs/jobs_general_test.go
@@ -1,6 +1,8 @@
package jobs
import (
+ "io/ioutil"
+ "net/http"
"os"
"os/signal"
"sync"
@@ -15,6 +17,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs"
"github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp"
"github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral"
+ "github.com/spiral/roadrunner/v2/plugins/metrics"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -119,3 +122,125 @@ func TestJobsInit(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()
}
+
+func TestJOBSMetrics(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ cfg := &config.Viper{}
+ cfg.Prefix = "rr"
+ cfg.Path = "configs/.rr-jobs-metrics.yaml"
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &server.Plugin{},
+ &jobs.Plugin{},
+ &metrics.Plugin{},
+ &ephemeral.Plugin{},
+ mockLogger,
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ tt := time.NewTimer(time.Minute * 3)
+
+ go func() {
+ defer tt.Stop()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 2)
+
+ t.Run("DeclareEphemeralPipeline", declareEphemeralPipe)
+ t.Run("ConsumeEphemeralPipeline", consumeEphemeralPipe)
+ t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second)
+ t.Run("PushEphemeralPipeline", pushToPipeDelayed("test-3", 5))
+ time.Sleep(time.Second)
+ t.Run("PushEphemeralPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 5)
+
+ genericOut, err := get()
+ assert.NoError(t, err)
+
+ assert.Contains(t, genericOut, `rr_jobs_jobs_err 0`)
+ assert.Contains(t, genericOut, `rr_jobs_jobs_ok 3`)
+ assert.Contains(t, genericOut, `rr_jobs_push_err 0`)
+ assert.Contains(t, genericOut, `rr_jobs_push_ok 3`)
+ assert.Contains(t, genericOut, "workers_memory_bytes")
+
+ close(sig)
+ time.Sleep(time.Second * 2)
+}
+
+const getAddr = "http://127.0.0.1:2112/metrics"
+
+// get request and return body
+func get() (string, error) {
+ r, err := http.Get(getAddr)
+ if err != nil {
+ return "", err
+ }
+
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return "", err
+ }
+
+ err = r.Body.Close()
+ if err != nil {
+ return "", err
+ }
+ // unsafe
+ return string(b), err
+}
diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go
index 80c5e8f4..839e1aaa 100644
--- a/tests/plugins/jobs/jobs_sqs_test.go
+++ b/tests/plugins/jobs/jobs_sqs_test.go
@@ -13,6 +13,7 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
@@ -51,7 +52,8 @@ func TestSQSInit(t *testing.T) {
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Warn("sqs listener stopped").Times(2)
+ mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes()
+ mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes()
err = cont.RegisterAll(
cfg,
@@ -133,10 +135,15 @@ func TestSQSDeclare(t *testing.T) {
mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Warn("sqs listener stopped").Times(1)
+ mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes()
+ mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes()
err = cont.RegisterAll(
cfg,
@@ -231,9 +238,14 @@ func TestSQSJobsError(t *testing.T) {
mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Warn("sqs listener stopped").Times(1)
+ mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes()
+ mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes()
err = cont.RegisterAll(
cfg,
@@ -339,6 +351,136 @@ func TestSQSNoGlobalSection(t *testing.T) {
require.Error(t, err)
}
+func TestSQSStat(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "sqs/.rr-sqs-declare.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
+ mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "sqs", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes()
+ mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes()
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &sqs.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclareSQSPipeline", declareSQSPipe)
+ t.Run("ConsumeSQSPipeline", resumePipes("test-3"))
+ t.Run("PushSQSPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second)
+ t.Run("PauseSQSPipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second)
+
+ t.Run("PushSQSPipelineDelayed", pushToPipeDelayed("test-3", 5))
+ t.Run("PushSQSPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second)
+
+ out := &jobState.State{}
+ t.Run("Stats", stats(out))
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "sqs")
+ assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default")
+
+ assert.Equal(t, int64(1), out.Active)
+ assert.Equal(t, int64(1), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+
+ time.Sleep(time.Second)
+ t.Run("ResumePipeline", resumePipes("test-3"))
+ time.Sleep(time.Second * 7)
+
+ out = &jobState.State{}
+ t.Run("Stats", stats(out))
+
+ assert.Equal(t, out.Pipeline, "test-3")
+ assert.Equal(t, out.Driver, "sqs")
+ assert.Equal(t, out.Queue, "http://127.0.0.1:9324/000000000000/default")
+
+ assert.Equal(t, int64(0), out.Active)
+ assert.Equal(t, int64(0), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+
+ t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
func declareSQSPipe(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:6001")
assert.NoError(t, err)
diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go
index 627b32f0..f50d34cc 100644
--- a/tests/plugins/jobs/jobs_with_toxics_test.go
+++ b/tests/plugins/jobs/jobs_with_toxics_test.go
@@ -56,6 +56,12 @@ func TestDurabilityAMQP(t *testing.T) {
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Error("job push error, job might be lost", "error", gomock.Any(), "pipeline", "test-1", "ID", gomock.Any(), "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Error("job push error, job might be lost", "error", gomock.Any(), "pipeline", "test-2", "ID", gomock.Any(), "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(4)
// redial errors
@@ -176,13 +182,16 @@ func TestDurabilitySQS(t *testing.T) {
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes()
- mockLogger.EXPECT().Warn("sqs listener stopped").MinTimes(2)
-
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
// redial errors
mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-2", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Info("queues and subscribers redeclared successfully").AnyTimes()
+
+ // stop
+ mockLogger.EXPECT().Warn("sqs listener stopped").AnyTimes()
+ mockLogger.EXPECT().Info("------> job poller stopped <------").AnyTimes()
err = cont.RegisterAll(
cfg,
@@ -246,8 +255,9 @@ func TestDurabilitySQS(t *testing.T) {
time.Sleep(time.Second * 3)
go func() {
- time.Sleep(time.Second * 2)
+ time.Sleep(time.Second)
t.Run("PushPipelineWhileRedialing-1", pushToPipe("test-1"))
+ time.Sleep(time.Second)
t.Run("PushPipelineWhileRedialing-2", pushToPipe("test-2"))
}()
@@ -291,9 +301,12 @@ func TestDurabilityBeanstalk(t *testing.T) {
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
// redial errors
mockLogger.EXPECT().Info("beanstalk redial was successful").MinTimes(2)
mockLogger.EXPECT().Error("pipeline error", "pipeline", "test-1", "error", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).AnyTimes()
diff --git a/tests/plugins/metrics/docker-compose.yml b/tests/plugins/metrics/docker-compose.yml
deleted file mode 100644
index 610633b4..00000000
--- a/tests/plugins/metrics/docker-compose.yml
+++ /dev/null
@@ -1,7 +0,0 @@
-version: '3.7'
-
-services:
- prometheus:
- image: prom/prometheus
- ports:
- - 9090:9090
diff --git a/tests/plugins/reload/configs/.rr-reload-2.yaml b/tests/plugins/reload/configs/.rr-reload-2.yaml
index fd1fe417..6a9d7582 100644
--- a/tests/plugins/reload/configs/.rr-reload-2.yaml
+++ b/tests/plugins/reload/configs/.rr-reload-2.yaml
@@ -27,7 +27,7 @@ logs:
mode: development
level: debug
reload:
- interval: 2s
+ interval: 1s
patterns:
- .txt
services:
diff --git a/tests/plugins/reload/reload_plugin_test.go b/tests/plugins/reload/reload_plugin_test.go
index b5223b9c..21c27e49 100644
--- a/tests/plugins/reload/reload_plugin_test.go
+++ b/tests/plugins/reload/reload_plugin_test.go
@@ -50,11 +50,11 @@ func TestReloadInit(t *testing.T) {
mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).MinTimes(1)
- mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).Times(1)
- mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").Times(1)
- mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").Times(1)
- mockLogger.EXPECT().Info("HTTP handler listeners successfully re-added").Times(1)
- mockLogger.EXPECT().Info("HTTP plugin successfully restarted").Times(1)
+ mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", "file.txt", "size", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").MinTimes(1)
+ mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").MinTimes(1)
+ mockLogger.EXPECT().Info("HTTP handler listeners successfully re-added").MinTimes(1)
+ mockLogger.EXPECT().Info("HTTP plugin successfully restarted").MinTimes(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() // placeholder for the workerlogerror
err = cont.RegisterAll(
@@ -258,10 +258,10 @@ func TestReloadFilterFileExt(t *testing.T) {
mockLogger.EXPECT().Debug("file was created", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(100)
mockLogger.EXPECT().Debug("file was added to watcher", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).MinTimes(1)
mockLogger.EXPECT().Debug("file added to the list of removed files", "path", gomock.Any(), "name", gomock.Any(), "size", gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").Times(1)
- mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").Times(1)
+ mockLogger.EXPECT().Info("HTTP plugin got restart request. Restarting...").MinTimes(1)
+ mockLogger.EXPECT().Info("HTTP workers Pool successfully restarted").MinTimes(1)
mockLogger.EXPECT().Info("HTTP handler listeners successfully re-added").MinTimes(1)
- mockLogger.EXPECT().Info("HTTP plugin successfully restarted").Times(1)
+ mockLogger.EXPECT().Info("HTTP plugin successfully restarted").MinTimes(1)
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() // placeholder for the workerlogerror
err = cont.RegisterAll(
@@ -317,6 +317,7 @@ func TestReloadFilterFileExt(t *testing.T) {
time.Sleep(time.Second * 3)
t.Run("ReloadMakeFiles", reloadMakeFiles)
+ time.Sleep(time.Second * 2)
t.Run("ReloadFilteredExt", reloadFilteredExt)
time.Sleep(time.Second * 10)
@@ -460,10 +461,15 @@ func TestReloadCopy100(t *testing.T) {
time.Sleep(time.Second * 3)
t.Run("ReloadMake100Files", reloadMake100Files)
+ time.Sleep(time.Second * 2)
t.Run("ReloadCopyFiles", reloadCopyFiles)
+ time.Sleep(time.Second * 2)
t.Run("ReloadRecursiveDirsSupport", copyFilesRecursive)
+ time.Sleep(time.Second * 2)
t.Run("RandomChangesInRecursiveDirs", randomChangesInRecursiveDirs)
+ time.Sleep(time.Second * 2)
t.Run("RemoveFilesSupport", removeFilesSupport)
+ time.Sleep(time.Second * 2)
t.Run("ReloadMoveSupport", reloadMoveSupport)
time.Sleep(time.Second * 10)
@@ -733,7 +739,7 @@ func TestReloadNoRecursion(t *testing.T) {
time.Sleep(time.Second * 3)
t.Run("ReloadMakeFiles", reloadMakeFiles) // make files in the testDir
- time.Sleep(time.Second * 3)
+ time.Sleep(time.Second * 2)
t.Run("ReloadCopyFilesRecursive", reloadCopyFiles)
time.Sleep(time.Second * 3)
assert.NoError(t, freeResources(testDir))
diff --git a/tests/plugins/resetter/resetter_test.go b/tests/plugins/resetter/resetter_test.go
index 465d22dd..10f38a9c 100644
--- a/tests/plugins/resetter/resetter_test.go
+++ b/tests/plugins/resetter/resetter_test.go
@@ -41,11 +41,6 @@ func TestResetterInit(t *testing.T) {
mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Debug("started List method").MinTimes(1)
- mockLogger.EXPECT().Debug("services list", "services", []string{"resetter.plugin1"}).MinTimes(1)
- mockLogger.EXPECT().Debug("finished List method").MinTimes(1)
- mockLogger.EXPECT().Debug("started Reset method for the service", "service", "resetter.plugin1").MinTimes(1)
- mockLogger.EXPECT().Debug("finished Reset method for the service", "service", "resetter.plugin1").MinTimes(1)
mockLogger.EXPECT().Warn("listener accept error, connection closed", "error", gomock.Any()).AnyTimes()
err = cont.RegisterAll(
diff --git a/tests/plugins/service/service_plugin_test.go b/tests/plugins/service/service_plugin_test.go
index 8948a458..ddf54520 100644
--- a/tests/plugins/service/service_plugin_test.go
+++ b/tests/plugins/service/service_plugin_test.go
@@ -1,3 +1,4 @@
+//go:build linux
// +build linux
package service
diff --git a/tests/supervised.php b/tests/supervised.php
new file mode 100644
index 00000000..74f8c994
--- /dev/null
+++ b/tests/supervised.php
@@ -0,0 +1,14 @@
+<?php
+
+declare(strict_types=1);
+
+use Spiral\Goridge\StreamRelay;
+use Spiral\RoadRunner\Worker as RoadRunner;
+
+require __DIR__ . "/vendor/autoload.php";
+
+$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
+$mem = '';
+while($rr->waitPayload()){
+ $rr->respond(new \Spiral\RoadRunner\Payload(""));
+}
diff --git a/tests/worker.php b/tests/temporal-worker.php
index 5c9c80e6..5c9c80e6 100644
--- a/tests/worker.php
+++ b/tests/temporal-worker.php
diff --git a/utils/isolate.go b/utils/isolate.go
index b05f4b3a..202f538c 100755
--- a/utils/isolate.go
+++ b/utils/isolate.go
@@ -1,3 +1,4 @@
+//go:build !windows
// +build !windows
package utils
diff --git a/utils/isolate_win.go b/utils/isolate_win.go
index b2b213a8..6b6d22e0 100755
--- a/utils/isolate_win.go
+++ b/utils/isolate_win.go
@@ -1,3 +1,4 @@
+//go:build windows
// +build windows
package utils
diff --git a/utils/network.go b/utils/network.go
index 86a7e733..301e2bab 100755
--- a/utils/network.go
+++ b/utils/network.go
@@ -1,3 +1,4 @@
+//go:build linux || darwin || freebsd
// +build linux darwin freebsd
package utils
diff --git a/utils/network_windows.go b/utils/network_windows.go
index 6eefb8f7..88e0fdb6 100755
--- a/utils/network_windows.go
+++ b/utils/network_windows.go
@@ -1,3 +1,4 @@
+//go:build windows
// +build windows
package utils