summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.dockerignore2
-rw-r--r--.github/workflows/linux.yml8
-rwxr-xr-xCODE_OF_CONDUCT.md2
-rwxr-xr-xMakefile82
-rw-r--r--common/doc.go9
-rw-r--r--common/jobs/interface.go24
-rw-r--r--common/kv/interface.go (renamed from plugins/kv/interface.go)0
-rw-r--r--common/pubsub/interface.go (renamed from pkg/pubsub/interface.go)0
-rw-r--r--common/pubsub/psmessage.go (renamed from pkg/pubsub/psmessage.go)0
-rw-r--r--go.mod6
-rw-r--r--go.sum23
-rwxr-xr-xpkg/events/general.go2
-rw-r--r--pkg/events/interface.go4
-rw-r--r--pkg/events/jobs_events.go89
-rw-r--r--pkg/events/pool_events.go2
-rw-r--r--pkg/events/worker_events.go2
-rw-r--r--pkg/pool/config.go2
-rwxr-xr-xpkg/pool/static_pool.go4
-rwxr-xr-xpkg/pool/static_pool_test.go34
-rw-r--r--pkg/pool/supervisor_test.go12
-rw-r--r--pkg/priority_queue/binary_heap.go125
-rw-r--r--pkg/priority_queue/binary_heap_test.go124
-rw-r--r--pkg/priority_queue/interface.go28
-rw-r--r--pkg/process/state.go8
-rwxr-xr-xpkg/worker/sync_worker.go2
-rw-r--r--pkg/worker_watcher/interface.go (renamed from pkg/worker_watcher/container/interface.go)3
-rw-r--r--plugins/broadcast/interface.go2
-rw-r--r--plugins/broadcast/plugin.go2
-rw-r--r--plugins/broadcast/rpc.go2
-rw-r--r--plugins/http/plugin.go8
-rw-r--r--plugins/informer/interface.go2
-rw-r--r--plugins/informer/plugin.go2
-rw-r--r--plugins/informer/rpc.go16
-rw-r--r--plugins/jobs/config.go55
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio1
-rw-r--r--plugins/jobs/drivers/amqp/config.go58
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go416
-rw-r--r--plugins/jobs/drivers/amqp/item.go187
-rw-r--r--plugins/jobs/drivers/amqp/listener.go25
-rw-r--r--plugins/jobs/drivers/amqp/plugin.go40
-rw-r--r--plugins/jobs/drivers/amqp/rabbit_init.go65
-rw-r--r--plugins/jobs/drivers/amqp/redial.go126
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go204
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go112
-rw-r--r--plugins/jobs/drivers/ephemeral/plugin.go41
-rw-r--r--plugins/jobs/drivers/sqs/config.go103
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go229
-rw-r--r--plugins/jobs/drivers/sqs/item.go227
-rw-r--r--plugins/jobs/drivers/sqs/listener.go66
-rw-r--r--plugins/jobs/drivers/sqs/plugin.go39
-rw-r--r--plugins/jobs/job/general.go31
-rw-r--r--plugins/jobs/job/job_options.go74
-rw-r--r--plugins/jobs/job/job_options_test.go110
-rw-r--r--plugins/jobs/pipeline/pipeline.go78
-rw-r--r--plugins/jobs/pipeline/pipeline_test.go21
-rw-r--r--plugins/jobs/plugin.go525
-rw-r--r--plugins/jobs/rpc.go119
-rw-r--r--plugins/kv/drivers/boltdb/driver.go3
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go9
-rw-r--r--plugins/kv/drivers/memcached/driver.go3
-rw-r--r--plugins/kv/drivers/memcached/plugin.go9
-rw-r--r--plugins/kv/plugin.go12
-rw-r--r--plugins/kv/rpc.go3
-rw-r--r--plugins/memory/kv.go3
-rw-r--r--plugins/memory/plugin.go5
-rw-r--r--plugins/memory/pubsub.go4
-rw-r--r--plugins/redis/channel.go2
-rw-r--r--plugins/redis/kv.go3
-rw-r--r--plugins/redis/plugin.go4
-rw-r--r--plugins/redis/pubsub.go4
-rw-r--r--plugins/server/interface.go2
-rw-r--r--plugins/server/plugin.go43
-rw-r--r--plugins/websockets/executor/executor.go2
-rw-r--r--plugins/websockets/plugin.go10
-rw-r--r--plugins/websockets/pool/workers_pool.go2
-rw-r--r--plugins/websockets/validator/access_validator.go9
-rw-r--r--proto/jobs/v1beta/jobs.pb.go676
-rw-r--r--proto/jobs/v1beta/jobs.proto49
-rw-r--r--proto/kv/v1beta/kv.pb.go9
-rw-r--r--proto/websockets/v1beta/websockets.pb.go9
-rw-r--r--tests/Dockerfile0
-rw-r--r--tests/composer.json2
-rw-r--r--tests/docker-compose.yaml15
-rw-r--r--tests/env/Dockerfile-elastic-mq.yaml9
-rw-r--r--tests/env/custom.conf8
-rw-r--r--tests/env/docker-compose.yaml37
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go21
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml2
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-global.yaml9
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml9
-rw-r--r--tests/plugins/broadcast/plugins/plugin1.go29
-rw-r--r--tests/plugins/broadcast/plugins/plugin2.go27
-rw-r--r--tests/plugins/broadcast/plugins/plugin3.go29
-rw-r--r--tests/plugins/broadcast/plugins/plugin4.go30
-rw-r--r--tests/plugins/broadcast/plugins/plugin5.go30
-rw-r--r--tests/plugins/broadcast/plugins/plugin6.go30
-rw-r--r--tests/plugins/headers/configs/.rr-cors-headers.yaml4
-rw-r--r--tests/plugins/http/configs/.rr-env.yaml6
-rw-r--r--tests/plugins/http/configs/.rr-http.yaml4
-rw-r--r--tests/plugins/http/handler_test.go54
-rw-r--r--tests/plugins/http/uploads_test.go8
-rw-r--r--tests/plugins/informer/test_plugin.go6
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init-no-amqp-global.yaml75
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml93
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-list.yaml91
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml78
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml78
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-pause-resume-ephemeral.yaml44
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-test.yaml105
-rw-r--r--tests/plugins/jobs/jobs_plugin_test.go404
-rw-r--r--tests/plugins/resetter/test_plugin.go2
-rw-r--r--tests/plugins/server/plugin_pipes.go2
-rw-r--r--tests/psr-worker-bench.php2
-rw-r--r--tests/worker-cors.php15
-rw-r--r--utils/pointers.go15
115 files changed, 5523 insertions, 308 deletions
diff --git a/.dockerignore b/.dockerignore
index bfa82a3d..b817b3c8 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -7,4 +7,4 @@
/tests
/bin
composer.json
-vendor_php \ No newline at end of file
+vendor_php
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index 62987771..583d97f3 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -66,16 +66,19 @@ jobs:
- name: Run golang tests on Linux
run: |
- docker-compose -f ./tests/docker-compose.yaml up -d
+ docker-compose -f ./tests/env/docker-compose.yaml up -d
mkdir ./coverage-ci
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.txt -covermode=atomic ./pkg/transport/pipe
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.txt -covermode=atomic ./pkg/transport/socket
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.txt -covermode=atomic ./pkg/pool
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.txt -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.txt -covermode=atomic ./pkg/bst
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.txt -covermode=atomic ./pkg/priority_queue
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.txt -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http_config.txt -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/server_cmd.txt -covermode=atomic ./plugins/server
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/struct_jobs.txt -covermode=atomic ./plugins/jobs/structs
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipeline_jobs.txt -covermode=atomic ./plugins/jobs/pipeline
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/http.txt -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/informer.txt -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/reload.txt -covermode=atomic ./tests/plugins/reload
@@ -94,7 +97,8 @@ jobs:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/broadcast_plugin.txt -covermode=atomic ./tests/plugins/broadcast
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/websockets.txt -covermode=atomic ./tests/plugins/websockets
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/ws_origin.txt -covermode=atomic ./plugins/websockets
- docker-compose -f ./tests/docker-compose.yaml down
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/jobs_core.txt -covermode=atomic ./tests/plugins/jobs
+ docker-compose -f ./tests/env/docker-compose.yaml down
cat ./coverage-ci/*.txt > ./coverage-ci/summary.txt
- uses: codecov/codecov-action@v1 # Docs: <https://github.com/codecov/codecov-action>
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
index ae0b283a..49aeb3c8 100755
--- a/CODE_OF_CONDUCT.md
+++ b/CODE_OF_CONDUCT.md
@@ -43,4 +43,4 @@ Project maintainers who do not follow or enforce the Code of Conduct in good fai
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version]
[homepage]: http://contributor-covenant.org
-[version]: http://contributor-covenant.org/version/1/4/
+[version]: https://www.contributor-covenant.org/version/2/0/code_of_conduct/
diff --git a/Makefile b/Makefile
index bcfce79d..a63d8093 100755
--- a/Makefile
+++ b/Makefile
@@ -5,7 +5,7 @@
SHELL = /bin/sh
test_coverage:
- docker-compose -f tests/docker-compose.yaml up -d --remove-orphans
+ docker-compose -f tests/env/docker-compose.yaml up -d --remove-orphans
rm -rf coverage
mkdir coverage
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipe.out -covermode=atomic ./pkg/transport/pipe
@@ -14,9 +14,12 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker.out -covermode=atomic ./pkg/worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/worker_stack.out -covermode=atomic ./pkg/worker_watcher
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/bst.out -covermode=atomic ./pkg/bst
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq.out -covermode=atomic ./pkg/priority_queue
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/struct_jobs.out -covermode=atomic ./plugins/jobs/structs
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http_config.out -covermode=atomic ./plugins/http/config
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/server_cmd.out -covermode=atomic ./plugins/server
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/http.out -covermode=atomic ./tests/plugins/http
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/informer.out -covermode=atomic ./tests/plugins/informer
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/reload.out -covermode=atomic ./tests/plugins/reload
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/server.out -covermode=atomic ./tests/plugins/server
@@ -34,20 +37,24 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/broadcast_plugin.out -covermode=atomic ./tests/plugins/broadcast
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/ws_plugin.out -covermode=atomic ./tests/plugins/websockets
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/ws_origin.out -covermode=atomic ./plugins/websockets
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/jobs_core.out -covermode=atomic ./tests/plugins/jobs
cat ./coverage/*.out > ./coverage/summary.out
- docker-compose -f tests/docker-compose.yaml down
+ docker-compose -f tests/env/docker-compose.yaml down
test: ## Run application tests
- docker-compose -f tests/docker-compose.yaml up -d
+ docker-compose -f tests/env/docker-compose.yaml up -d
go test -v -race -tags=debug ./pkg/transport/pipe
go test -v -race -tags=debug ./pkg/transport/socket
go test -v -race -tags=debug ./pkg/pool
go test -v -race -tags=debug ./pkg/worker
go test -v -race -tags=debug ./pkg/worker_watcher
go test -v -race -tags=debug ./pkg/bst
- go test -v -race -tags=debug ./tests/plugins/http
+ go test -v -race -tags=debug ./pkg/priority_queue
+ go test -v -race -tags=debug ./plugins/jobs/job
+ go test -v -race -tags=debug ./plugins/jobs/pipeline
go test -v -race -tags=debug ./plugins/http/config
go test -v -race -tags=debug ./plugins/server
+ go test -v -race -tags=debug ./tests/plugins/http
go test -v -race -tags=debug ./tests/plugins/informer
go test -v -race -tags=debug ./tests/plugins/reload
go test -v -race -tags=debug ./tests/plugins/server
@@ -65,34 +72,39 @@ test: ## Run application tests
go test -v -race -tags=debug ./tests/plugins/broadcast
go test -v -race -tags=debug ./tests/plugins/websockets
go test -v -race -tags=debug ./plugins/websockets
- docker-compose -f tests/docker-compose.yaml down
+ go test -v -race -tags=debug ./tests/plugins/jobs
+ docker-compose -f tests/env/docker-compose.yaml down
-testGo1.17beta1: ## Run application tests
- docker-compose -f tests/docker-compose.yaml up -d
- go1.17beta1 test -v -race -tags=debug ./pkg/transport/pipe
- go1.17beta1 test -v -race -tags=debug ./pkg/transport/socket
- go1.17beta1 test -v -race -tags=debug ./pkg/pool
- go1.17beta1 test -v -race -tags=debug ./pkg/worker
- go1.17beta1 test -v -race -tags=debug ./pkg/worker_watcher
- go1.17beta1 test -v -race -tags=debug ./pkg/bst
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/http
- go1.17beta1 test -v -race -tags=debug ./plugins/http/config
- go1.17beta1 test -v -race -tags=debug ./plugins/server
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/informer
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/reload
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/server
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/service
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/status
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/config
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/gzip
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/headers
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/logger
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/metrics
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/redis
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/resetter
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/rpc
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/kv
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/websockets
- go1.17beta1 test -v -race -tags=debug ./tests/plugins/broadcast
- go1.17beta1 test -v -race -tags=debug ./plugins/websockets
- docker-compose -f tests/docker-compose.yaml down
+testGo1.17rc1: ## Run application tests
+ docker-compose -f tests/env/docker-compose.yaml up -d
+ go1.17rc1 test -v -race -tags=debug ./pkg/transport/pipe
+ go1.17rc1 test -v -race -tags=debug ./pkg/transport/socket
+ go1.17rc1 test -v -race -tags=debug ./pkg/pool
+ go1.17rc1 test -v -race -tags=debug ./pkg/worker
+ go1.17rc1 test -v -race -tags=debug ./pkg/worker_watcher
+ go1.17rc1 test -v -race -tags=debug ./pkg/bst
+ go1.17rc1 test -v -race -tags=debug ./pkg/priority_queue
+ go1.17rc1 test -v -race -tags=debug ./plugins/jobs/job
+ go1.17rc1 test -v -race -tags=debug ./plugins/jobs/pipeline
+ go1.17rc1 test -v -race -tags=debug ./plugins/http/config
+ go1.17rc1 test -v -race -tags=debug ./plugins/server
+ go1.17rc1 test -v -race -tags=debug ./plugins/websockets
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/http
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/informer
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/reload
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/server
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/service
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/status
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/config
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/gzip
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/headers
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/logger
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/metrics
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/redis
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/resetter
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/rpc
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/kv
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/websockets
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/broadcast
+ go1.17rc1 test -v -race -tags=debug ./tests/plugins/jobs
+ docker-compose -f tests/env/docker-compose.yaml down
diff --git a/common/doc.go b/common/doc.go
new file mode 100644
index 00000000..adc03351
--- /dev/null
+++ b/common/doc.go
@@ -0,0 +1,9 @@
+/*
+Package common used to collect common interfaces/structures which might be implemented (or imported) by a different plugins.
+For example, 'pubsub' interface might be implemented by memory, redis, websockets and many other plugins.
+
+Folders:
+- kv - contains KV interfaces and structures
+- pubsub - contains pub-sub interfaces and structures
+*/
+package common
diff --git a/common/jobs/interface.go b/common/jobs/interface.go
new file mode 100644
index 00000000..f90c9c21
--- /dev/null
+++ b/common/jobs/interface.go
@@ -0,0 +1,24 @@
+package jobs
+
+import (
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+)
+
+// Consumer todo naming
+type Consumer interface {
+ Push(job *job.Job) error
+ Register(pipeline *pipeline.Pipeline) error
+ Run(pipeline *pipeline.Pipeline) error
+ Stop() error
+
+ Pause(pipeline string)
+ Resume(pipeline string)
+}
+
+type Constructor interface {
+ JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (Consumer, error)
+ FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (Consumer, error)
+}
diff --git a/plugins/kv/interface.go b/common/kv/interface.go
index 5736a6a7..5736a6a7 100644
--- a/plugins/kv/interface.go
+++ b/common/kv/interface.go
diff --git a/pkg/pubsub/interface.go b/common/pubsub/interface.go
index 06252d70..06252d70 100644
--- a/pkg/pubsub/interface.go
+++ b/common/pubsub/interface.go
diff --git a/pkg/pubsub/psmessage.go b/common/pubsub/psmessage.go
index e33d9284..e33d9284 100644
--- a/pkg/pubsub/psmessage.go
+++ b/common/pubsub/psmessage.go
diff --git a/go.mod b/go.mod
index 2ac9684c..1cedb379 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,12 @@ go 1.16
require (
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
github.com/alicebob/miniredis/v2 v2.14.5
+ github.com/aws/aws-sdk-go-v2 v1.7.0
+ github.com/aws/aws-sdk-go-v2/config v1.4.1
+ github.com/aws/aws-sdk-go-v2/credentials v1.3.0
+ github.com/aws/aws-sdk-go-v2/service/sqs v1.6.0
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
+ github.com/cenkalti/backoff/v4 v4.1.1
github.com/fasthttp/websocket v1.4.3
github.com/fatih/color v1.12.0
github.com/go-ole/go-ole v1.2.5 // indirect
@@ -22,6 +27,7 @@ require (
github.com/spiral/endure v1.0.2
github.com/spiral/errors v1.0.11
github.com/spiral/goridge/v3 v3.1.4
+ github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
// ===========
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.6 // indirect
diff --git a/go.sum b/go.sum
index f218097f..fbaf6411 100644
--- a/go.sum
+++ b/go.sum
@@ -43,6 +43,26 @@ github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6l
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
+github.com/aws/aws-sdk-go-v2 v1.7.0 h1:UYGnoIPIzed+ycmgw8Snb/0HK+KlMD+SndLTneG8ncE=
+github.com/aws/aws-sdk-go-v2 v1.7.0/go.mod h1:tb9wi5s61kTDA5qCkcDbt3KRVV74GGslQkl/DRdX/P4=
+github.com/aws/aws-sdk-go-v2/config v1.4.1 h1:PcGp9Kf+1dHJmP3EIDZJmAmWfGABFTU0obuvYQNzWH8=
+github.com/aws/aws-sdk-go-v2/config v1.4.1/go.mod h1:HCDWZ/oeY59TPtXslxlbkCqLQBsVu6b09kiG43tdP+I=
+github.com/aws/aws-sdk-go-v2/credentials v1.3.0 h1:vXxTINCsHn6LKhR043jwSLd6CsL7KOEU7b1woMr1K1A=
+github.com/aws/aws-sdk-go-v2/credentials v1.3.0/go.mod h1:tOcv+qDZ0O+6Jk2beMl5JnZX6N0H7O8fw9UsD3bP7GI=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.2.0 h1:ucExzYCoAiL9GpKOsKkQLsa43wTT23tcdP4cDTSbZqY=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.2.0/go.mod h1:XvzoGzuS0kKPzCQtJCC22Xh/mMgVAzfGo/0V+mk/Cu0=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.1.0 h1:DJq/vXXF+LAFaa/kQX9C6arlf4xX4uaaqGWIyAKOCpM=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.1.0/go.mod h1:qGQ/9IfkZonRNSNLE99/yBJ7EPA/h8jlWEqtJCcaj+Q=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.0 h1:g2npzssI/6XsoQaPYCxliMFeC5iNKKvO0aC+/wWOE0A=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.0/go.mod h1:a7XLWNKuVgOxjssEF019IiHPv35k8KHBaWv/wJAfi2A=
+github.com/aws/aws-sdk-go-v2/service/sqs v1.6.0 h1:45YlPhQ/U5v8QnzJFD1bWlTT4IA2NQ9tQ2D/AfyIX3Q=
+github.com/aws/aws-sdk-go-v2/service/sqs v1.6.0/go.mod h1:8iLn005F6ASRIXmp6U4hfRAk8EHAtRPrx1oHyxxz2xg=
+github.com/aws/aws-sdk-go-v2/service/sso v1.3.0 h1:DMi9w+TpUam7eJ8ksL7svfzpqpqem2MkDAJKW8+I2/k=
+github.com/aws/aws-sdk-go-v2/service/sso v1.3.0/go.mod h1:qWR+TUuvfji9udM79e4CPe87C5+SjMEb2TFXkZaI0Vc=
+github.com/aws/aws-sdk-go-v2/service/sts v1.5.0 h1:Y1K9dHE2CYOWOvaJSIITq4mJfLX43iziThTvqs5FqOg=
+github.com/aws/aws-sdk-go-v2/service/sts v1.5.0/go.mod h1:HjDKUmissf6Mlut+WzG2r35r6LeTKmLEDJ6p9NryzLg=
+github.com/aws/smithy-go v1.5.0 h1:2grDq7LxZlo8BZUDeqRfQnQWLZpInmh2TLPPkJku3YM=
+github.com/aws/smithy-go v1.5.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -208,6 +228,8 @@ github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmK
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
+github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
@@ -391,6 +413,7 @@ github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdU
github.com/spiral/goridge/v3 v3.1.4 h1:5egVVTfaD1PO4MRgzU0yyog86pAh+JIOk7xhe7BtG40=
github.com/spiral/goridge/v3 v3.1.4/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
+github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
diff --git a/pkg/events/general.go b/pkg/events/general.go
index a09a8759..5cf13e10 100755
--- a/pkg/events/general.go
+++ b/pkg/events/general.go
@@ -4,6 +4,8 @@ import (
"sync"
)
+const UnknownEventType string = "Unknown event type"
+
// HandlerImpl helps to broadcast events to multiple listeners.
type HandlerImpl struct {
listeners []Listener
diff --git a/pkg/events/interface.go b/pkg/events/interface.go
index ac6c15a4..7d57e4d0 100644
--- a/pkg/events/interface.go
+++ b/pkg/events/interface.go
@@ -2,7 +2,7 @@ package events
// Handler interface
type Handler interface {
- // Return number of active listeners
+ // NumListeners return number of active listeners
NumListeners() int
// AddListener adds lister to the publisher
AddListener(listener Listener)
@@ -10,5 +10,5 @@ type Handler interface {
Push(e interface{})
}
-// Event listener listens for the events produced by worker, worker pool or other service.
+// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service.
type Listener func(event interface{})
diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go
new file mode 100644
index 00000000..9a7116ff
--- /dev/null
+++ b/pkg/events/jobs_events.go
@@ -0,0 +1,89 @@
+package events
+
+import (
+ "time"
+)
+
+const (
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK = iota + 12000
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventPipeRun when pipeline pipelines has been requested.
+ EventPipeRun
+
+ EventInitialized
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventDriverReady thrown when broken is ready to accept/serve tasks.
+ EventDriverReady
+)
+
+type J int64
+
+func (ev J) String() string {
+ switch ev {
+ case EventPushOK:
+ return "EventPushOK"
+ case EventPushError:
+ return "EventPushError"
+ case EventJobStart:
+ return "EventJobStart"
+ case EventJobOK:
+ return "EventJobOK"
+ case EventJobError:
+ return "EventJobError"
+ case EventPipeRun:
+ return "EventPipeRun"
+ case EventInitialized:
+ return "EventInitialized"
+ case EventPipeActive:
+ return "EventPipeActive"
+ case EventPipeStopped:
+ return "EventPipeStopped"
+ case EventPipeError:
+ return "EventPipeError"
+ case EventDriverReady:
+ return "EventDriverReady"
+ }
+ return UnknownEventType
+}
+
+// JobEvent represent job event.
+type JobEvent struct {
+ Event J
+ // String is job id.
+ ID string
+
+ // Pipeline name
+ Pipeline string
+
+ // Associated driver name (amqp, ephemeral, etc)
+ Driver string
+
+ // Error for the jobs/pipes errors
+ Error error
+
+ // event timings
+ Start time.Time
+ Elapsed time.Duration
+}
diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go
index e7b451e0..4d4cae5d 100644
--- a/pkg/events/pool_events.go
+++ b/pkg/events/pool_events.go
@@ -57,7 +57,7 @@ func (ev P) String() string {
case EventPoolRestart:
return "EventPoolRestart"
}
- return "Unknown event type"
+ return UnknownEventType
}
// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go
index 11bd6ab7..39c38e57 100644
--- a/pkg/events/worker_events.go
+++ b/pkg/events/worker_events.go
@@ -20,7 +20,7 @@ func (ev W) String() string {
case EventWorkerStderr:
return "EventWorkerStderr"
}
- return "Unknown event type"
+ return UnknownEventType
}
// WorkerEvent wraps worker events.
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index 2a3dabe4..3a058956 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -5,7 +5,7 @@ import (
"time"
)
-// Configures the pool behavior.
+// Config .. Pool config Configures the pool behavior.
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 5a6247b5..f2f19795 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -26,7 +26,7 @@ type Command func() *exec.Cmd
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
- cfg Config
+ cfg *Config
// worker command creator
cmd Command
@@ -51,7 +51,7 @@ type StaticPool struct {
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) {
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
const op = errors.Op("static_pool_initialize")
if factory == nil {
return nil, errors.E(op, errors.Str("no factory initialized"))
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 6f875072..3df773ab 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -20,7 +20,7 @@ import (
"github.com/stretchr/testify/assert"
)
-var cfg = Config{
+var cfg = &Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -58,7 +58,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
@@ -204,7 +204,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
- // Consume pool events
+ // Run pool events
ev := make(chan struct{}, 1)
listener := func(event interface{}) {
if pe, ok := event.(events.PoolEvent); ok {
@@ -214,7 +214,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
}
- var cfg2 = Config{
+ var cfg2 = &Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -264,7 +264,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
DestroyTimeout: time.Second * 2,
@@ -283,7 +283,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
@@ -320,7 +320,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
Debug: true,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -360,7 +360,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -400,7 +400,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -422,7 +422,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -452,7 +452,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
@@ -476,7 +476,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -506,7 +506,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
// sleep for the 3 seconds
func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
Debug: false,
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -539,7 +539,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -556,7 +556,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -595,7 +595,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
@@ -626,7 +626,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- Config{
+ &Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 1cd301ba..a321fdf0 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/require"
)
-var cfgSupervised = Config{
+var cfgSupervised = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -84,7 +84,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -177,7 +177,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
}
func TestSupervisedPool_Idle(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -225,7 +225,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
}
func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -267,7 +267,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
}
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -309,7 +309,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
}
func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
- var cfgExecTTL = Config{
+ var cfgExecTTL = &Config{
NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
diff --git a/pkg/priority_queue/binary_heap.go b/pkg/priority_queue/binary_heap.go
new file mode 100644
index 00000000..fc043927
--- /dev/null
+++ b/pkg/priority_queue/binary_heap.go
@@ -0,0 +1,125 @@
+/*
+binary heap (min-heap) algorithm used as a core for the priority queue
+*/
+
+package priorityqueue
+
+import (
+ "sync"
+ "sync/atomic"
+)
+
+type BinHeap struct {
+ items []Item
+ // find a way to use pointer to the raw data
+ len uint64
+ maxLen uint64
+ cond sync.Cond
+}
+
+func NewBinHeap(maxLen uint64) *BinHeap {
+ return &BinHeap{
+ items: make([]Item, 0, 1000),
+ len: 0,
+ maxLen: maxLen,
+ cond: sync.Cond{L: &sync.Mutex{}},
+ }
+}
+
+func (bh *BinHeap) fixUp() {
+ k := bh.len - 1
+ p := (k - 1) >> 1 // k-1 / 2
+
+ for k > 0 {
+ cur, par := (bh.items)[k], (bh.items)[p]
+
+ if cur.Priority() < par.Priority() {
+ bh.swap(k, p)
+ k = p
+ p = (k - 1) >> 1
+ } else {
+ return
+ }
+ }
+}
+
+func (bh *BinHeap) swap(i, j uint64) {
+ (bh.items)[i], (bh.items)[j] = (bh.items)[j], (bh.items)[i]
+}
+
+func (bh *BinHeap) fixDown(curr, end int) {
+ cOneIdx := (curr << 1) + 1
+ for cOneIdx <= end {
+ cTwoIdx := -1
+ if (curr<<1)+2 <= end {
+ cTwoIdx = (curr << 1) + 2
+ }
+
+ idxToSwap := cOneIdx
+ if cTwoIdx > -1 && (bh.items)[cTwoIdx].Priority() < (bh.items)[cOneIdx].Priority() {
+ idxToSwap = cTwoIdx
+ }
+ if (bh.items)[idxToSwap].Priority() < (bh.items)[curr].Priority() {
+ bh.swap(uint64(curr), uint64(idxToSwap))
+ curr = idxToSwap
+ cOneIdx = (curr << 1) + 1
+ } else {
+ return
+ }
+ }
+}
+
+func (bh *BinHeap) Len() uint64 {
+ return atomic.LoadUint64(&bh.len)
+}
+
+func (bh *BinHeap) Insert(item Item) {
+ bh.cond.L.Lock()
+
+ // check the binary heap len before insertion
+ if bh.Len() > bh.maxLen {
+ // unlock the mutex to proceed to get-max
+ bh.cond.L.Unlock()
+
+ // signal waiting goroutines
+ for bh.Len() > 0 {
+ // signal waiting goroutines
+ bh.cond.Signal()
+ }
+ // lock mutex to proceed inserting into the empty slice
+ bh.cond.L.Lock()
+ }
+
+ bh.items = append(bh.items, item)
+
+ // add len to the slice
+ atomic.AddUint64(&bh.len, 1)
+
+ // fix binary heap up
+ bh.fixUp()
+ bh.cond.L.Unlock()
+
+ // signal the goroutine on wait
+ bh.cond.Signal()
+}
+
+func (bh *BinHeap) ExtractMin() Item {
+ bh.cond.L.Lock()
+
+ // if len == 0, wait for the signal
+ for bh.Len() == 0 {
+ bh.cond.Wait()
+ }
+
+ bh.swap(0, bh.len-1)
+
+ item := (bh.items)[int(bh.len)-1]
+ bh.items = (bh).items[0 : int(bh.len)-1]
+ bh.fixDown(0, int(bh.len-2))
+
+ // reduce len
+ atomic.AddUint64(&bh.len, ^uint64(0))
+
+ bh.cond.L.Unlock()
+ return item
+}
diff --git a/pkg/priority_queue/binary_heap_test.go b/pkg/priority_queue/binary_heap_test.go
new file mode 100644
index 00000000..663741ad
--- /dev/null
+++ b/pkg/priority_queue/binary_heap_test.go
@@ -0,0 +1,124 @@
+package priorityqueue
+
+import (
+ "fmt"
+ "math/rand"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+type Test int
+
+func (t Test) Ack() error {
+ return nil
+}
+
+func (t Test) Nack() error {
+ return nil
+}
+
+func (t Test) Body() []byte {
+ return nil
+}
+
+func (t Test) Context() ([]byte, error) {
+ return nil, nil
+}
+
+func (t Test) ID() string {
+ return "none"
+}
+
+func (t Test) Priority() int64 {
+ return int64(t)
+}
+
+func TestBinHeap_Init(t *testing.T) {
+ a := []Item{Test(2), Test(23), Test(33), Test(44), Test(1), Test(2), Test(2), Test(2), Test(4), Test(6), Test(99)}
+
+ bh := NewBinHeap(12)
+
+ for i := 0; i < len(a); i++ {
+ bh.Insert(a[i])
+ }
+
+ expected := []Item{Test(1), Test(2), Test(2), Test(2), Test(2), Test(4), Test(6), Test(23), Test(33), Test(44), Test(99)}
+
+ res := make([]Item, 0, 12)
+
+ for i := 0; i < 11; i++ {
+ item := bh.ExtractMin()
+ res = append(res, item)
+ }
+
+ require.Equal(t, expected, res)
+}
+
+func TestNewPriorityQueue(t *testing.T) {
+ insertsPerSec := uint64(0)
+ getPerSec := uint64(0)
+ stopCh := make(chan struct{}, 1)
+ pq := NewBinHeap(1000)
+
+ go func() {
+ tt3 := time.NewTicker(time.Millisecond * 10)
+ for {
+ select {
+ case <-tt3.C:
+ require.Less(t, pq.Len(), uint64(1002))
+ case <-stopCh:
+ return
+ }
+ }
+ }()
+
+ go func() {
+ tt := time.NewTicker(time.Second)
+
+ for {
+ select {
+ case <-tt.C:
+ fmt.Println(fmt.Sprintf("Insert per second: %d", atomic.LoadUint64(&insertsPerSec)))
+ atomic.StoreUint64(&insertsPerSec, 0)
+ fmt.Println(fmt.Sprintf("ExtractMin per second: %d", atomic.LoadUint64(&getPerSec)))
+ atomic.StoreUint64(&getPerSec, 0)
+ case <-stopCh:
+ tt.Stop()
+ return
+ }
+ }
+ }()
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ pq.ExtractMin()
+ atomic.AddUint64(&getPerSec, 1)
+ }
+ }
+ }()
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ pq.Insert(Test(rand.Int())) //nolint:gosec
+ atomic.AddUint64(&insertsPerSec, 1)
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ stopCh <- struct{}{}
+ stopCh <- struct{}{}
+ stopCh <- struct{}{}
+}
diff --git a/pkg/priority_queue/interface.go b/pkg/priority_queue/interface.go
new file mode 100644
index 00000000..d64aaf3d
--- /dev/null
+++ b/pkg/priority_queue/interface.go
@@ -0,0 +1,28 @@
+package priorityqueue
+
+type Queue interface {
+ Insert(item Item)
+ ExtractMin() Item
+ Len() uint64
+}
+
+// Item represents binary heap item
+type Item interface {
+ // ID is a unique item identifier
+ ID() string
+
+ // Priority returns the Item's priority to sort
+ Priority() int64
+
+ // Body is the Item payload
+ Body() []byte
+
+ // Context is the Item meta information
+ Context() ([]byte, error)
+
+ // Ack - acknowledge the Item after processing
+ Ack() error
+
+ // Nack - discard the Item
+ Nack() error
+}
diff --git a/pkg/process/state.go b/pkg/process/state.go
index 652ec77c..bfc3a287 100644
--- a/pkg/process/state.go
+++ b/pkg/process/state.go
@@ -32,20 +32,20 @@ type State struct {
}
// WorkerProcessState creates new worker state definition.
-func WorkerProcessState(w worker.BaseProcess) (State, error) {
+func WorkerProcessState(w worker.BaseProcess) (*State, error) {
const op = errors.Op("worker_process_state")
p, _ := process.NewProcess(int32(w.Pid()))
i, err := p.MemoryInfo()
if err != nil {
- return State{}, errors.E(op, err)
+ return nil, errors.E(op, err)
}
percent, err := p.CPUPercent()
if err != nil {
- return State{}, err
+ return nil, err
}
- return State{
+ return &State{
CPUPercent: percent,
Pid: int(w.Pid()),
Status: w.State().String(),
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 02f11d0b..380bfff7 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -23,7 +23,7 @@ type SyncWorkerImpl struct {
}
// From creates SyncWorker from BaseProcess
-func From(process *Process) SyncWorker {
+func From(process *Process) *SyncWorkerImpl {
return &SyncWorkerImpl{
process: process,
fPool: sync.Pool{New: func() interface{} {
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/interface.go
index e10ecdae..e7503467 100644
--- a/pkg/worker_watcher/container/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -1,4 +1,5 @@
-package container
+
+package worker_watcher //nolint:stylecheck
import (
"context"
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
index 46709d71..eda3572f 100644
--- a/plugins/broadcast/interface.go
+++ b/plugins/broadcast/interface.go
@@ -1,6 +1,6 @@
package broadcast
-import "github.com/spiral/roadrunner/v2/pkg/pubsub"
+import "github.com/spiral/roadrunner/v2/common/pubsub"
type Broadcaster interface {
GetDriver(key string) (pubsub.SubReader, error)
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 6ddef806..889dc2fa 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -7,7 +7,7 @@ import (
"github.com/google/uuid"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
index 2ee211f8..475076a0 100644
--- a/plugins/broadcast/rpc.go
+++ b/plugins/broadcast/rpc.go
@@ -2,7 +2,7 @@ package broadcast
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
)
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index bec01ac3..2ee83384 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -143,7 +143,7 @@ func (p *Plugin) Serve() chan error {
func (p *Plugin) serve(errCh chan error) {
var err error
const op = errors.Op("http_plugin_serve")
- p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
@@ -285,13 +285,13 @@ func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// Workers returns slice with the process states for the workers
-func (p *Plugin) Workers() []process.State {
+func (p *Plugin) Workers() []*process.State {
p.RLock()
defer p.RUnlock()
workers := p.workers()
- ps := make([]process.State, 0, len(workers))
+ ps := make([]*process.State, 0, len(workers))
for i := 0; i < len(workers); i++ {
state, err := process.WorkerProcessState(workers[i])
if err != nil {
@@ -323,7 +323,7 @@ func (p *Plugin) Reset() error {
p.pool = nil
var err error
- p.pool, err = p.server.NewWorkerPool(context.Background(), pool.Config{
+ p.pool, err = p.server.NewWorkerPool(context.Background(), &pool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
diff --git a/plugins/informer/interface.go b/plugins/informer/interface.go
index 316c7bc1..d91ddf9d 100644
--- a/plugins/informer/interface.go
+++ b/plugins/informer/interface.go
@@ -11,7 +11,7 @@ Because Availabler implementation should present in every plugin
// Informer used to get workers from particular plugin or set of plugins
type Informer interface {
- Workers() []process.State
+ Workers() []*process.State
}
// Availabler interface should be implemented by every plugin which wish to report to the PHP worker that it available in the RR runtime
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index f8725ed7..c613af58 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -19,7 +19,7 @@ func (p *Plugin) Init() error {
}
// Workers provides BaseProcess slice with workers for the requested plugin
-func (p *Plugin) Workers(name string) []process.State {
+func (p *Plugin) Workers(name string) []*process.State {
svc, ok := p.withWorkers[name]
if !ok {
return nil
diff --git a/plugins/informer/rpc.go b/plugins/informer/rpc.go
index 3925ef64..02254865 100644
--- a/plugins/informer/rpc.go
+++ b/plugins/informer/rpc.go
@@ -11,7 +11,7 @@ type rpc struct {
// WorkerList contains list of workers.
type WorkerList struct {
// Workers is list of workers.
- Workers []process.State `json:"workers"`
+ Workers []*process.State `json:"workers"`
}
// List all resettable services.
@@ -38,3 +38,17 @@ func (rpc *rpc) Workers(service string, list *WorkerList) error {
return nil
}
+
+// sort.Sort
+
+func (w *WorkerList) Len() int {
+ return len(w.Workers)
+}
+
+func (w *WorkerList) Less(i, j int) bool {
+ return w.Workers[i].Pid < w.Workers[j].Pid
+}
+
+func (w *WorkerList) Swap(i, j int) {
+ w.Workers[i], w.Workers[j] = w.Workers[j], w.Workers[i]
+}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
new file mode 100644
index 00000000..1b613231
--- /dev/null
+++ b/plugins/jobs/config.go
@@ -0,0 +1,55 @@
+package jobs
+
+import (
+ "runtime"
+
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+)
+
+const (
+ // name used to set pipeline name
+ pipelineName string = "name"
+)
+
+// Config defines settings for job broker, workers and job-pipeline mapping.
+type Config struct {
+ // NumPollers configures number of priority queue pollers
+ // Should be no more than 255
+ // Default - num logical cores
+ NumPollers uint8 `mapstructure:"num_pollers"`
+
+ // PipelineSize is the limit of a main jobs queue which consume Items from the drivers pipeline
+ // Driver pipeline might be much larger than a main jobs queue
+ PipelineSize uint64 `mapstructure:"pipeline_size"`
+
+ // Pool configures roadrunner workers pool.
+ Pool *poolImpl.Config `mapstructure:"Pool"`
+
+ // Pipelines defines mapping between PHP job pipeline and associated job broker.
+ Pipelines map[string]*pipeline.Pipeline `mapstructure:"pipelines"`
+
+ // Consuming specifies names of pipelines to be consumed on service start.
+ Consume []string `mapstructure:"consume"`
+}
+
+func (c *Config) InitDefaults() {
+ if c.Pool == nil {
+ c.Pool = &poolImpl.Config{}
+ }
+
+ if c.PipelineSize == 0 {
+ c.PipelineSize = 1_000_000
+ }
+
+ if c.NumPollers == 0 {
+ c.NumPollers = uint8(runtime.NumCPU())
+ }
+
+ for k := range c.Pipelines {
+ // set the pipeline name
+ c.Pipelines[k].With(pipelineName, k)
+ }
+
+ c.Pool.InitDefaults()
+}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
new file mode 100644
index 00000000..aaed82c7
--- /dev/null
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-07-09T07:14:41.096Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.124 Electron/13.1.6 Safari/537.36" etag="0gh7yhPcQUpxg5xU25Ad" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7R1pc9q69tcwk96ZZGzL68csNLdt2tCQtM398sZgEdwYTG2ThP76J8mysaUDmGADWbhzG5A3+eybjlrodPR0HrmT4dfQw0FLU7ynFjpraZqKDI38oSOzdMSyzXTgLvI9ftJ8oOv/xXxQ4aNT38Nx6cQkDIPEn5QH++F4jPtJacyNovCxfNogDMpPnbh3WBro9t1AHv3pe8kwmx35zI/8i/27YSIdGrnZ+XwgHrpe+FgYQu0WOo3CMEm/jZ5OcUDhl4Emve7jgqP53CI8Tqpc0O3E5u3p15n7OdSH6t23b+376FDnk3twgyl/6avLy2sy8vnypEv+dC5uzj9942+QzDLIROF07GF6Z6WFTh6HfoK7E7dPjz4SWiBjw2QUkF8q+ToIx8lHd+QHlAy6H8mdvobjkBxwA/9uTMaiFIIngdvDQSeM/cQP6XiAB3T4AUeJT7ByIRxOwknh6DG/WS9MknBEH+sHwWkYhBGbMhoMsNnvk/E4icJ7XDjiWU5Poe8hg5RDmT4DPxWGOIjPcTjCSTQjp/CjumJpR0Z6Fad61Tb5wOOciDRTT8eGRfLRHU68nHDv8vvPEUu+cNzCeP41Hj/9q5LrHv4mY+v8O/5P6R9qEgaxR0id/wyjZBjehWM3aM9HT8o4np9zEVK4M8z+xkky43zrTpOwjHcCxWj2i15PIMJ/3haPnT3xm6e/ZvzXQnpZiJ84nEZ9vOz9OQASN7rDyZIT+Q0pdJaiO8KBm/gPZUEBoY1f2gl9MuecTBxbK9GIpdgZ1WQ3SafKryvytXgryyrfSpNulYJHutVxFLmzwmkTekIs0Vr+1s8nP7QT8nvykwL1kV+3GbmR73Paoz9mRULcD5JVK1KsviWKFchMd6pSbF1ElMnigqrqkDsph1RNTeMhuSn9c+Im/eGGCqusOTwD254OaQ5b6yHT5PgvjCvs8yy6qK5qbAEjhpFjpKhoMoOkqGh0uyE9o75z+vqcrlfVTea2WN08sh3TMFRL01VkOSUyQ0aDjH/5t4O+DNH/HpxDEyXnXYWYnxnj74imihSV0xdMU6LFafcxbHH2bEM35hangNBmqSg1heonGkUwQ1StIpksNkPgJ1mmoIksnT9pkYkkXjGf25xS01nUqrB0SWGdYzqriT/BgT/GtSoprBI1ZUHE5pgWcnelpBxVQBaopFQNUFIINaSkDAkvLc0MqKc5KeHD/DOlzvnJiNCtT1zLY3JUmTyRfxlslHT8kLmh9JheOEbAkxxy/5Ye6xMQ4qh82MP9MHJTZ5aeQ5EfpYSRPZp8u+N/2QR72cDnsEfActxNomk/mUY4O4EApCdeRMYm4tgwEkee9e6ph04P2vSgOG0yS3bFGbt35I/v8hfJn99xZ0HoegvOg+bexEw/eZhJjOVzvZxQbMX5eflvaLICc4txijR6kYVAeKgjJGcNAhYmovzNmZMrKVVbxqyLRcPAoP9BosFkH1k0pJ96JEDO27PMIEUy/5sA/6ua3pAAMF+BAMhIb+9lQCfXdjtl8E7kh5GfzPJpTImJQINwIo+fETtotmCy87OOkwSPJslcFDCDY7uvdEWZoTzdwix6Ena3Na9rf4TDafKcSb3LTFBmIlupKDNRUzIzs+PePfs1fLLMY1/t2tv74Npr5tZjerIqTn2kPtFt0xFTmAdeRF44+sDe/s6PiRrF1FwchPRoMsQll0rpzcq/x+7oTXhatlNOKSAN9LQsQGqYjQkN511orC807IpCI1MSuxUaapPxwKUAKgiNr5g8kQz9Zg5nmLtpjz4hfEFuFGQDP/GIoo3fIRwHFGTUuJvJN6xRjAwGA21BPtrsmcbOxIhRDthoKphV0CExojQkRjS5SoFmeg4oJlvaaYomqh+IJCCPJ/+fUJBGteLLc7E9APFl9m3cG+yJ2FcVrarYN8ym8CWn6hYghLrBZahnxj33l+X6jpHveamewLH/1+2xW1Eo87gxua9x0jLO6L2Iaoi5HyAhbhyOccO4cRyxEkQBcGMYgB1vNYUaTUKNhBTiu1H5eJpXVSkybZP/PtLHntxFrufj+bEMrGXmyU8HceC58TDnykXVPnldz3KnMKcboQBJ9AczG4Q+0o0n6YsO/Cc6j5SUcNR+wClFMeoZuhN6wejpjla7HbmPsX40jdmzGiUiZJeIyLQhBrdsmYisGrK8f82B94jsj/r96Pj2fPrUsZx/s3KOncTPgMCYO6IEEMi/GBIHVL5nw+VLypE1KTSRB9FOC85IjwmyDzuKtYGgkqIyy+DIzpKjY1SdprbOPzGLJ8ZHn1OdiqOIOlo1h5CaepErbukdFKy8f7LvR1ks8uW9FyfB1/ZaFwRZB3Taqc6WX+nFvMkWwq6NMb9L1FiRstKg94cX8wJXWOSNpW8gmDuZYo+YDVC0G+TIMuQyLFT+ex1SNiw5pIwgL0G1a8jDg2bEe3BoaXAIhJkhx4bA82qPJ2+EabnionPT/XdD97xBm1s1BW5xdCgDA5VWohqCIMswWoDhn6PftAqE2MZDl7hqwT7DUxXgiRwAnmAVUA3wjK/Oesrv+98/rfvvXffX6U/z+iYrAnhB0kctyp6CKGpE+lw5Nxex8dDt2sF/t7+02P3pX0A16SBsm6lJX7dQUHWssuusGrZVpBv5CjMrTnn+FaZmCbS5WWnhMkQUhMG34+vuhgLgWWku2PJZEAitQ5Q4mgjwrPK0KEpUQJToTYkSe5eiRC0KkhUFynNDZpuiBITZzkTJRph+eSbrVpXGRphG+6E0NFG+IHt5dTmRQBtfYaj1Kg0YEcr+UK/6gql3mUbeO+q1neUGjEyL619Rt8kDI2K3y4COrNdCvuB52p6QryESo76CGJXNr7DQNshXstgLFcnfb9o37Vqtd9NVlMEAst6RpSinp7KRPmCfpgMBsuyArHcoENCY9b5Hha0VxcpWw5DLxMULs941iQe/tr9eXt3Wynkvxm+21F37zcb+cN5yb6oBDkK74qCaQmm6uiIwpouSXt+GjyOXphx//d55mwyuO7tmcHkFdAPdpYQyTVtRLARh59hQFB0oZduO4YPsKoYPtKanMeyosj78NJoQkUqx0R9iYN3E667XVFVUTlOpKGtoUkr7WTKS6ijYhJEkSzQCkYDWKpC3jcIH38ORXJn+yhFlKkJlrWEC+URkNoMosJWe7OCpdF3BHFkuuweXfAXMLaob8vwHcSieuONsTGN3T2/6aewnPkHw33QFA7UXCDApwHzG0Om6B/5IOpUxHYgx+8OLXVu8fD5dUNX5XqidKT62MAxMcMWcEZ1zhy0N9fMpe2VwFGcxcietfBHHPZ7FKeLYYZ/OmC7uitnd8mv64Xjg35EvB+7ozyRdHtDD7jhO3OA+/YmT/occCgxh9B6HDFt5pWexeHQdQFRmRKG74bbZUNCCH9mnJvaUopCO4kjs6TiAHNVrWIoGsqes68qmSLd99aP9VhWfZuqi4kOAPLXQNuXpjuO9m8dl5pGYJuIyINAArxI8jyNt+3GZZbMu8Cbnxo38glVdZ3PGfW7brs1YLkuzzN01oE0sXBLVFMvtUUnUi+E4oO3aMhrfE46THfN9yEZs206ReRBlfXm3kY1YJpmLmLkE4lfDcNSbxmsjgkY6+gqEiMFg1WJVSW7WgQGxP5xqGLIUtLeJALnOlrz9+nKxusGfL2wk7tgx7VpP7znB43SEy037eTh7bib6+fLQqlkeckLJ3OsNWxkgVTHE9uyGJayDrqlPJVLmy33nz1peGQRdo9u6QNSbxc1BdDgS1c+wHFWqk+zTZbtN0vyaxkHzNJ/F3rdM9JpjmCJRmZbgaNfXswN+9Xfbcn26yuhlNWEZe2VdZvMuSJMfxxcb25SVDZMlfdoXdHZvxJg07YqlLXU0bYMRIUe9OiENqRJZyxtPUppIphHrljMK4zSrkLsC6YVvLRimSVu4GJocDkMqtIVLY+EwOUjy0/WTN4ojpDhqFRzZW0WR7FX/SHGj+GmSwsPExvFY2oXmIeTcjz+g82dt6zBLjByffslG+jT3oLgeHWaIX5E6AtZnL162XsxsFBIw+fxfeTKkDrHhSPJflZMemgo0amks6ZFtwPZuda1jdQFrrOEgWu37cWwmfuRV1j8vr760r1ilzeXlxbbMLyDmZIO+HDKRg7z6zC9RI1iOWtEAM5rCiRzOa/9qn95c01TjwZ8jL2TdT4rp62kUtVjePvjwxpS65piSn2plWxUVZWhTHdNgFMoBwTPWkJaq9C6OHli/kbeYNUa2JWaNjZwHiyxnADqvOXzt0RrNfaznh6sjqkYaUDP1yFLMStcUXTLwUbVAbV0xK6DtaMr5rbxcZDcKdSsJal3VJQzomcmzsxR1TtEvlrt3YNFqWkXurr+j9GbIBsJXUpqa93Ic9+IJ5HJulLZWXJqfBBhNO7PM5VmIRUXmDXEmMoFC5a0mrjU5hfSGA1R6vu93rr2AfY+2Gp5CcgSx48YxCzOxTRNo4+y8GPiNYUtc3oTyTQXLlqy6TYTtUc6sYreR3es6VLkga79yZkgOHtPmqJhFBrK29ulOKIe0XbU/8Ptk5M8UA7K1ITvU82mPzLQd9yOOE1ljVmiS35ACNHSgYBlSgHXsUgCHA99usHX9DbM3YPCq4Vnk7BeDA00wrz5dXn26vp0bs0qDfTFXuZUVrd0tGrWWUrEiWq9hJwsYaXL89gr3MSEWj4tkbtymq42OzvCDn668gvN7bMuqz5e0tfHBCI/CaPYhPZUdcMcUIP1wTMHGDKaGk3yE19gt8g0k09f58J7lW2wpquI63szxKUaooUbKjWX5kByhpv38W1lKISNRvo6uUN5RyPROVtV/v3oPQPDXrOwmxZIPe6v2v+xRs1pwZRCFoxxxdKVmo/qCM62gLXqq5w3ASnJVsZCDm9MKjmocaVUVg7Q3WX3m3uJdUChIM+n4OW3R/OizO/YYxi47nfZZq5T244sRJ8GUNtCfy9nird4aSyJDFzCPZJbUsv2vtsKSuuylVd8RId3YQJn/u4u9bD4lmMqOg4K8Zz5kYzvawFs4pEkKZSl40hFpp/YzljJ/7i7SdU8nC0izSZX3lN7dpE5Cb1bYVqU3S56xkUrdkzolsoLcks3roDgxtiSfblvzjB1H6p7kcf/+YINtdOqezjd39XwEpdDYdiYr05I73d8EmYL9Bu6DqCmgtmhMXcjbG3NEihbbq13PZ1ugtbbVJX267MW/r+lrVQ+56cA25pd/O+jLEP3vwTk0UXLeVS5uziu3Fah7UZ/Ytsl2BKu/tiV9opeYPWnxgj7hCiKBHIGgG1jOp8shgff1fK+F3jXHsAWaEpMaNdE7eZIgzrOBhd2V5bnxAspG6T3Lyxb7PsbMchLLQ0QHKV1VwfzycXjIFkcNWDA2DmlBJ0+5vbUGa5olilQHKsTVgEJc1FQCxZB978w8fkdqJRNZsUWkIiB2vV2kAjFOITJBHxvcyfba7pFVB6PZitgAY14Ev6pvax1hrqf48Xr64+EPunBubi87ivX552jJBs+Lkk7rbZJaTCqxLonZtr1F3lQm/sRnG3rGrfXyUU1NNW25mLYITOe37nq4pmbGGit2Eybp+vkED7yQSUCqnz0c4GTtzF5T89VZI0gGv8J85zHJvZikkU2SbypbCJkeeHjgTgMKWMqYNILFtoKncB6Qs719AbR5tMgEqju3+4KELlJtsXzWVqFokQEIXaeGKAXoTGiS0G2nsSKFrQqmEQslnvaHXPKkRLmn6JFwAWBsIXoss6wSdR3QhzqAGq0p1EA9mPNGsbRvLFvf7TLXLlVkrD5ENkwLW1TzbPy8rG8NZG6njqIGVKqKVcYlMqCwLNj3vI6dX0FsbpLDq7qruSixOfuVDnu4H/IuHOwcmrSP+Kb3i3aXz9NOOSE1k7Vb8c7z3dptKG1B+zYbJzxFZ5zNc5fWScs6q5TGgMNLAgPIGQ3qP/Fwk6ot86eWxNQ3zWNsxjHiuoEFeQygFa+h1lHsALJM5mns/xani3bYmEcRW8+ImgsRvpVhRRCIQOEqDGwbppBNw4ZiKBqZxpGCdN3WLMM2NUXwJutbXbkMGgUpzDtxU9XKKhJDIEKzVmnTggzXZvzpKAIUUcVapDoKz0FIyknG0rJ093VaiaqJxNWpkKR0tmknynnGM9i0W5dwn1tgvxGEpY5mxNnVq9F6Y7Zb1uRk/xXRIoWjVdkFdPEeUbVoIruiJjI2VEQboVpOYbYntMtG5Abr8tMikDaiIExBQWiwywOzTXMWnP7SGWcP+MZ5CXwjZ1TALfD2mWUQcWWqlnhrVnM8s9MFuHXwjLUHTAN1sd0/rgF61p60j791r48vvrwo3tGBBvow4zTGNrLm5vuU0cUQuXu3z1Clbl5ZIunKkalVA6xuH1nrw5b8jEIaRZs72QQEw6+hh+kZ/wc=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go
new file mode 100644
index 00000000..7befb3c8
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/config.go
@@ -0,0 +1,58 @@
+package amqp
+
+// pipeline rabbitmq info
+const (
+ exchangeKey string = "exchange"
+ exchangeType string = "exchange-type"
+ queue string = "queue"
+ routingKey string = "routing-key"
+ prefetch string = "prefetch"
+ exclusive string = "exclusive"
+ priority string = "priority"
+
+ dlx string = "x-dead-letter-exchange"
+ dlxRoutingKey string = "x-dead-letter-routing-key"
+ dlxTTL string = "x-message-ttl"
+ dlxExpires string = "x-expires"
+
+ contentType string = "application/octet-stream"
+)
+
+type GlobalCfg struct {
+ Addr string `mapstructure:"addr"`
+}
+
+// Config is used to parse pipeline configuration
+type Config struct {
+ PrefetchCount int `mapstructure:"pipeline_size"`
+ Queue string `mapstructure:"queue"`
+ Priority int64 `mapstructure:"priority"`
+ Exchange string `mapstructure:"exchange"`
+ ExchangeType string `mapstructure:"exchange_type"`
+ RoutingKey string `mapstructure:"routing_key"`
+ Exclusive bool `mapstructure:"exclusive"`
+}
+
+func (c *Config) InitDefault() {
+ if c.ExchangeType == "" {
+ c.ExchangeType = "direct"
+ }
+
+ if c.Exchange == "" {
+ c.Exchange = "default"
+ }
+
+ if c.PrefetchCount == 0 {
+ c.PrefetchCount = 100
+ }
+
+ if c.Priority == 0 {
+ c.Priority = 10
+ }
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Addr == "" {
+ c.Addr = "amqp://guest:guest@localhost:5672/"
+ }
+}
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
new file mode 100644
index 00000000..31999e23
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -0,0 +1,416 @@
+package amqp
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/streadway/amqp"
+)
+
+type JobsConsumer struct {
+ sync.Mutex
+ log logger.Logger
+ pq priorityqueue.Queue
+ eh events.Handler
+
+ pipeline atomic.Value
+
+ // amqp connection
+ conn *amqp.Connection
+ consumeChan *amqp.Channel
+ publishChan *amqp.Channel
+
+ retryTimeout time.Duration
+ prefetchCount int
+ priority int64
+ exchangeName string
+ queue string
+ exclusive bool
+ consumeID string
+ connStr string
+ exchangeType string
+ routingKey string
+
+ delayCache map[string]struct{}
+
+ stopCh chan struct{}
+}
+
+// NewAMQPConsumer initializes rabbitmq pipeline
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) {
+ const op = errors.Op("new_amqp_consumer")
+ // we need to obtain two parts of the amqp information here.
+ // firs part - address to connect, it is located in the global section under the amqp pluginName
+ // second part - queues and other pipeline information
+ jb := &JobsConsumer{
+ log: log,
+ pq: pq,
+ eh: e,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ // TODO to config
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+ }
+
+ // if no such key - error
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ jb.routingKey = pipeCfg.RoutingKey
+ jb.queue = pipeCfg.Queue
+ jb.exchangeType = pipeCfg.ExchangeType
+ jb.exchangeName = pipeCfg.Exchange
+ jb.prefetchCount = pipeCfg.PrefetchCount
+ jb.exclusive = pipeCfg.Exclusive
+ jb.priority = pipeCfg.Priority
+
+ // PARSE CONFIGURATION -------
+
+ jb.conn, err = amqp.Dial(globalCfg.Addr)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save address
+ jb.connStr = globalCfg.Addr
+
+ err = jb.initRabbitMQ()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ jb.publishChan, err = jb.conn.Channel()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // run redialer for the connection
+ jb.redialer()
+
+ return jb, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) {
+ const op = errors.Op("new_amqp_consumer_from_pipeline")
+ // we need to obtain two parts of the amqp information here.
+ // firs part - address to connect, it is located in the global section under the amqp pluginName
+ // second part - queues and other pipeline information
+ jb := &JobsConsumer{
+ log: log,
+ eh: e,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ jb.routingKey = pipeline.String(routingKey, "")
+ jb.queue = pipeline.String(queue, "default")
+ jb.exchangeType = pipeline.String(exchangeType, "direct")
+ jb.exchangeName = pipeline.String(exchangeKey, "amqp.default")
+ jb.prefetchCount = pipeline.Int(prefetch, 10)
+ jb.priority = int64(pipeline.Int(priority, 10))
+ jb.exclusive = pipeline.Bool(exclusive, true)
+
+ // PARSE CONFIGURATION -------
+
+ jb.conn, err = amqp.Dial(globalCfg.Addr)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save address
+ jb.connStr = globalCfg.Addr
+
+ err = jb.initRabbitMQ()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ jb.publishChan, err = jb.conn.Channel()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // register the pipeline
+ // error here is always nil
+ _ = jb.Register(pipeline)
+
+ // run redialer for the connection
+ jb.redialer()
+
+ return jb, nil
+}
+
+func (j *JobsConsumer) Push(job *job.Job) error {
+ const op = errors.Op("rabbitmq_push")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != job.Options.Pipeline {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name()))
+ }
+
+ // lock needed here to protect redial concurrent operation
+ // we may be in the redial state here
+ j.Lock()
+ defer j.Unlock()
+
+ // convert
+ msg := fromJob(job)
+ p, err := pack(job.Ident, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ // TODO declare separate method for this if condition
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+
+ // delay cache optimization.
+ // If user already declared a queue with a delay, do not redeclare and rebind the queue
+ // Before -> 2.5k RPS with redeclaration
+ // After -> 30k RPS
+ if _, exists := j.delayCache[tmpQ]; exists {
+ // insert to the local, limited pipeline
+ err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: p,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ _, err = j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // insert to the local, limited pipeline
+ err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: p,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ j.delayCache[tmpQ] = struct{}{}
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: p,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
+ j.pipeline.Store(pipeline)
+ return nil
+}
+
+func (j *JobsConsumer) Run(p *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_consume")
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
+ var err error
+ j.consumeChan, err = j.conn.Channel()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // run listener
+ j.listener(deliv)
+
+ return nil
+}
+
+func (j *JobsConsumer) Pause(p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
+ err := j.consumeChan.Cancel(j.consumeID, true)
+ if err != nil {
+ j.log.Error("cancel publish channel, forcing close", "error", err)
+ errCl := j.consumeChan.Close()
+ if errCl != nil {
+ j.log.Error("force close failed", "error", err)
+ }
+ }
+}
+
+func (j *JobsConsumer) Resume(p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested resume on: ", p)
+ }
+
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
+ var err error
+ j.consumeChan, err = j.conn.Channel()
+ if err != nil {
+ j.log.Error("create channel on rabbitmq connection", "error", err)
+ return
+ }
+
+ err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ if err != nil {
+ j.log.Error("qos set failed", "error", err)
+ return
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ j.log.Error("consume operation failed", "error", err)
+ return
+ }
+
+ // run listener
+ j.listener(deliv)
+}
+
+func (j *JobsConsumer) Stop() error {
+ j.stopCh <- struct{}{}
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ Elapsed: 0,
+ })
+ return nil
+}
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
new file mode 100644
index 00000000..7c300c88
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -0,0 +1,187 @@
+package amqp
+
+import (
+ "time"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+ "github.com/streadway/amqp"
+)
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+
+ // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
+ AckFunc func(multiply bool) error
+
+ // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
+ // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
+ // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
+ // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
+ NackFunc func(multiply bool, requeue bool) error
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int64 `json:"timeout,omitempty"`
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
+
+func (j *Item) ID() string {
+ return j.Ident
+}
+
+func (j *Item) Priority() int64 {
+ return j.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (j *Item) Body() []byte {
+ return utils.AsBytes(j.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+// Not used in the amqp, amqp.Table used instead
+func (j *Item) Context() ([]byte, error) {
+ return nil, nil
+}
+
+func (j *Item) Ack() error {
+ return j.AckFunc(false)
+}
+
+func (j *Item) Nack() error {
+ return j.NackFunc(false, false)
+}
+
+func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
+ const op = errors.Op("from_delivery_convert")
+ item, err := j.unpack(d)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+ return &Item{
+ Job: item.Job,
+ Ident: item.Ident,
+ Payload: item.Payload,
+ Headers: item.Headers,
+ Options: item.Options,
+ AckFunc: d.Ack,
+ NackFunc: d.Nack,
+ }, nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ },
+ }
+}
+
+// pack job metadata into headers
+func pack(id string, j *Item) (amqp.Table, error) {
+ headers, err := json.Marshal(j.Headers)
+ if err != nil {
+ return nil, err
+ }
+ return amqp.Table{
+ job.RRID: id,
+ job.RRJob: j.Job,
+ job.RRPipeline: j.Options.Pipeline,
+ job.RRHeaders: headers,
+ job.RRTimeout: j.Options.Timeout,
+ job.RRDelay: j.Options.Delay,
+ job.RRPriority: j.Options.Priority,
+ }, nil
+}
+
+// unpack restores jobs.Options
+func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) {
+ item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
+
+ if _, ok := d.Headers[job.RRID].(string); !ok {
+ return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID))
+ }
+
+ item.Ident = d.Headers[job.RRID].(string)
+
+ if _, ok := d.Headers[job.RRJob].(string); !ok {
+ return nil, errors.E(errors.Errorf("missing header `%s`", job.RRJob))
+ }
+
+ item.Job = d.Headers[job.RRJob].(string)
+
+ if _, ok := d.Headers[job.RRPipeline].(string); ok {
+ item.Options.Pipeline = d.Headers[job.RRPipeline].(string)
+ }
+
+ if h, ok := d.Headers[job.RRHeaders].([]byte); ok {
+ err := json.Unmarshal(h, &item.Headers)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ if _, ok := d.Headers[job.RRTimeout].(int64); ok {
+ item.Options.Timeout = d.Headers[job.RRTimeout].(int64)
+ }
+
+ if _, ok := d.Headers[job.RRDelay].(int64); ok {
+ item.Options.Delay = d.Headers[job.RRDelay].(int64)
+ }
+
+ if _, ok := d.Headers[job.RRPriority]; !ok {
+ // set pipe's priority
+ item.Options.Priority = j.priority
+ } else {
+ item.Options.Priority = d.Headers[job.RRPriority].(int64)
+ }
+
+ return item, nil
+}
diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go
new file mode 100644
index 00000000..7241c717
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/listener.go
@@ -0,0 +1,25 @@
+package amqp
+
+import "github.com/streadway/amqp"
+
+func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case msg, ok := <-deliv:
+ if !ok {
+ j.log.Info("delivery channel closed, leaving the rabbit listener")
+ return
+ }
+
+ d, err := j.fromDelivery(msg)
+ if err != nil {
+ j.log.Error("amqp delivery convert", "error", err)
+ continue
+ }
+ // insert job into the main priority queue
+ j.pq.Insert(d)
+ }
+ }
+ }()
+}
diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go
new file mode 100644
index 00000000..624f4405
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/plugin.go
@@ -0,0 +1,40 @@
+package amqp
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ pluginName string = "amqp"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
+}
+
+// FromPipeline constructs AMQP driver from pipeline
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipe, p.log, p.cfg, e, pq)
+}
diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/rabbit_init.go
new file mode 100644
index 00000000..d6b8a708
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/rabbit_init.go
@@ -0,0 +1,65 @@
+package amqp
+
+import (
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+)
+
+func (j *JobsConsumer) initRabbitMQ() error {
+ const op = errors.Op("jobs_plugin_rmq_init")
+ // Channel opens a unique, concurrent server channel to process the bulk of AMQP
+ // messages. Any error from methods on this receiver will render the receiver
+ // invalid and a new Channel should be opened.
+ channel, err := j.conn.Channel()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // declare an exchange (idempotent operation)
+ err = channel.ExchangeDeclare(
+ j.exchangeName,
+ j.exchangeType,
+ true,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // verify or declare a queue
+ q, err := channel.QueueDeclare(
+ j.queue,
+ false,
+ false,
+ j.exclusive,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // bind queue to the exchange
+ err = channel.QueueBind(
+ q.Name,
+ j.routingKey,
+ j.exchangeName,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventInitialized,
+ Driver: "amqp",
+ Start: time.Now(),
+ })
+ return channel.Close()
+}
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
new file mode 100644
index 00000000..0b52a4d1
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -0,0 +1,126 @@
+package amqp
+
+import (
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/streadway/amqp"
+)
+
+// redialer used to redial to the rabbitmq in case of the connection interrupts
+func (j *JobsConsumer) redialer() { //nolint:gocognit
+ go func() {
+ const op = errors.Op("rabbitmq_redial")
+
+ for {
+ select {
+ case err := <-j.conn.NotifyClose(make(chan *amqp.Error)):
+ if err == nil {
+ return
+ }
+
+ j.Lock()
+
+ t := time.Now()
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeError,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Error: err,
+ Start: time.Now(),
+ })
+
+ j.log.Error("connection closed, reconnecting", "error", err)
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = j.retryTimeout
+ op := func() error {
+ j.log.Warn("rabbitmq reconnecting, caused by", "error", err)
+ var dialErr error
+ j.conn, dialErr = amqp.Dial(j.connStr)
+ if dialErr != nil {
+ return errors.E(op, dialErr)
+ }
+
+ j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
+
+ // re-init connection
+ errInit := j.initRabbitMQ()
+ if errInit != nil {
+ j.log.Error("rabbitmq dial", "error", errInit)
+ return errInit
+ }
+
+ // redeclare consume channel
+ var errConnCh error
+ j.consumeChan, errConnCh = j.conn.Channel()
+ if errConnCh != nil {
+ return errors.E(op, errConnCh)
+ }
+
+ // redeclare publish channel
+ var errPubCh error
+ j.publishChan, errPubCh = j.conn.Channel()
+ if errPubCh != nil {
+ return errors.E(op, errPubCh)
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // restart listener
+ j.listener(deliv)
+
+ j.log.Info("queues and subscribers redeclared successfully")
+ return nil
+ }
+
+ retryErr := backoff.Retry(op, expb)
+ if retryErr != nil {
+ j.Unlock()
+ j.log.Error("backoff failed", "error", retryErr)
+ return
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Start: t,
+ Elapsed: time.Since(t),
+ })
+
+ j.Unlock()
+
+ case <-j.stopCh:
+ err := j.publishChan.Close()
+ if err != nil {
+ j.log.Error("publish channel close", "error", err)
+ }
+ err = j.consumeChan.Close()
+ if err != nil {
+ j.log.Error("consume channel close", "error", err)
+ }
+ err = j.conn.Close()
+ if err != nil {
+ j.log.Error("amqp connection close", "error", err)
+ }
+ }
+ }
+ }()
+}
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
new file mode 100644
index 00000000..45ee8083
--- /dev/null
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -0,0 +1,204 @@
+package ephemeral
+
+import (
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ pipelineSize string = "pipeline_size"
+)
+
+type Config struct {
+ PipelineSize uint64 `mapstructure:"pipeline_size"`
+}
+
+type JobBroker struct {
+ cfg *Config
+ log logger.Logger
+ eh events.Handler
+ pipeline sync.Map
+ pq priorityqueue.Queue
+ localQueue chan *Item
+
+ stopCh chan struct{}
+}
+
+func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) {
+ const op = errors.Op("new_ephemeral_pipeline")
+
+ jb := &JobBroker{
+ log: log,
+ pq: pq,
+ eh: eh,
+ stopCh: make(chan struct{}, 1),
+ }
+
+ err := cfg.UnmarshalKey(configKey, &jb.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if jb.cfg.PipelineSize == 0 {
+ jb.cfg.PipelineSize = 100_000
+ }
+
+ // initialize a local queue
+ jb.localQueue = make(chan *Item, jb.cfg.PipelineSize)
+
+ // consume from the queue
+ go jb.consume()
+
+ return jb, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) {
+ jb := &JobBroker{
+ log: log,
+ pq: pq,
+ eh: eh,
+ stopCh: make(chan struct{}, 1),
+ }
+
+ jb.cfg.PipelineSize = uint64(pipeline.Int(pipelineSize, 100_000))
+
+ // initialize a local queue
+ jb.localQueue = make(chan *Item, jb.cfg.PipelineSize)
+
+ // consume from the queue
+ go jb.consume()
+
+ return jb, nil
+}
+
+func (j *JobBroker) Push(jb *job.Job) error {
+ const op = errors.Op("ephemeral_push")
+
+ // check if the pipeline registered
+ if b, ok := j.pipeline.Load(jb.Options.Pipeline); ok {
+ if !b.(bool) {
+ return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
+ }
+
+ msg := fromJob(jb)
+ // handle timeouts
+ if msg.Options.Timeout > 0 {
+ go func(jj *job.Job) {
+ time.Sleep(jj.Options.TimeoutDuration())
+
+ // send the item after timeout expired
+ j.localQueue <- msg
+ }(jb)
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ j.localQueue <- msg
+
+ return nil
+ }
+
+ return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
+}
+
+func (j *JobBroker) consume() {
+ // redirect
+ for {
+ select {
+ case item := <-j.localQueue:
+ j.pq.Insert(item)
+ case <-j.stopCh:
+ return
+ }
+ }
+}
+
+func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("ephemeral_register")
+ if _, ok := j.pipeline.Load(pipeline.Name()); ok {
+ return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
+ }
+
+ j.pipeline.Store(pipeline.Name(), true)
+
+ return nil
+}
+
+func (j *JobBroker) Pause(pipeline string) {
+ if q, ok := j.pipeline.Load(pipeline); ok {
+ if q == true {
+ // mark pipeline as turned off
+ j.pipeline.Store(pipeline, false)
+ }
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Pipeline: pipeline,
+ Start: time.Now(),
+ Elapsed: 0,
+ })
+}
+
+func (j *JobBroker) Resume(pipeline string) {
+ if q, ok := j.pipeline.Load(pipeline); ok {
+ if q == false {
+ // mark pipeline as turned off
+ j.pipeline.Store(pipeline, true)
+ }
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Pipeline: pipeline,
+ Start: time.Now(),
+ Elapsed: 0,
+ })
+}
+
+func (j *JobBroker) List() []string {
+ out := make([]string, 0, 2)
+
+ j.pipeline.Range(func(key, value interface{}) bool {
+ pipe := key.(string)
+ out = append(out, pipe)
+ return true
+ })
+
+ return out
+}
+
+// Run is no-op for the ephemeral
+func (j *JobBroker) Run(_ *pipeline.Pipeline) error {
+ return nil
+}
+
+func (j *JobBroker) Stop() error {
+ var pipe string
+ j.pipeline.Range(func(key, _ interface{}) bool {
+ pipe = key.(string)
+ j.pipeline.Delete(key)
+ return true
+ })
+
+ // return from the consumer
+ j.stopCh <- struct{}{}
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Pipeline: pipe,
+ Start: time.Now(),
+ Elapsed: 0,
+ })
+
+ return nil
+}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
new file mode 100644
index 00000000..442533c5
--- /dev/null
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -0,0 +1,112 @@
+package ephemeral
+
+import (
+ "time"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ },
+ }
+}
+
+type Item struct {
+ // Job contains name of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // Timeout defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int64 `json:"timeout,omitempty"`
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
+
+func (j *Item) ID() string {
+ return j.Ident
+}
+
+func (j *Item) Priority() int64 {
+ return j.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (j *Item) Body() []byte {
+ return utils.AsBytes(j.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+func (j *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Timeout int64 `json:"timeout"`
+ Pipeline string `json:"pipeline"`
+ }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (j *Item) Ack() error {
+ // noop for the in-memory
+ return nil
+}
+
+func (j *Item) Nack() error {
+ // noop for the in-memory
+ return nil
+}
diff --git a/plugins/jobs/drivers/ephemeral/plugin.go b/plugins/jobs/drivers/ephemeral/plugin.go
new file mode 100644
index 00000000..28495abb
--- /dev/null
+++ b/plugins/jobs/drivers/ephemeral/plugin.go
@@ -0,0 +1,41 @@
+package ephemeral
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "ephemeral"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
+
+// JobsConstruct creates new ephemeral consumer from the configuration
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewJobBroker(configKey, p.log, p.cfg, e, pq)
+}
+
+// FromPipeline creates new ephemeral consumer from the provided pipeline
+func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipeline, p.log, e, pq)
+}
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go
new file mode 100644
index 00000000..0b4e8157
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/config.go
@@ -0,0 +1,103 @@
+package sqs
+
+type GlobalCfg struct {
+ Key string `mapstructure:"key"`
+ Secret string `mapstructure:"secret"`
+ Region string `mapstructure:"region"`
+ SessionToken string `mapstructure:"session_token"`
+ Endpoint string `mapstructure:"endpoint"`
+}
+
+// Config is used to parse pipeline configuration
+type Config struct {
+ // The duration (in seconds) that the received messages are hidden from subsequent
+ // retrieve requests after being retrieved by a ReceiveMessage request.
+ VisibilityTimeout int32 `mapstructure:"visibility_timeout"`
+ // The duration (in seconds) for which the call waits for a message to arrive
+ // in the queue before returning. If a message is available, the call returns
+ // sooner than WaitTimeSeconds. If no messages are available and the wait time
+ // expires, the call returns successfully with an empty list of messages.
+ WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"`
+ // PrefetchCount is the maximum number of messages to return. Amazon SQS never returns more messages
+ // than this value (however, fewer messages might be returned). Valid values: 1 to
+ // 10. Default: 1.
+ PrefetchCount int32 `mapstructure:"pipeline_size"`
+ // The name of the new queue. The following limits apply to this name:
+ //
+ // * A queue
+ // name can have up to 80 characters.
+ //
+ // * Valid values: alphanumeric characters,
+ // hyphens (-), and underscores (_).
+ //
+ // * A FIFO queue name must end with the .fifo
+ // suffix.
+ //
+ // Queue URLs and names are case-sensitive.
+ //
+ // This member is required.
+ Queue string `mapstructure:"queue"`
+
+ // A map of attributes with their corresponding values. The following lists the
+ // names, descriptions, and values of the special request parameters that the
+ // CreateQueue action uses.
+ // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html
+ Attributes map[string]string `mapstructure:"attributes"`
+
+ // From amazon docs:
+ // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see
+ // Tagging Your Amazon SQS Queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html)
+ // in the Amazon SQS Developer Guide. When you use queue tags, keep the following
+ // guidelines in mind:
+ //
+ // * Adding more than 50 tags to a queue isn't recommended.
+ //
+ // *
+ // Tags don't have any semantic meaning. Amazon SQS interprets tags as character
+ // strings.
+ //
+ // * Tags are case-sensitive.
+ //
+ // * A new tag with a key identical to that
+ // of an existing tag overwrites the existing tag.
+ //
+ // For a full list of tag
+ // restrictions, see Quotas related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues)
+ // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you
+ // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account
+ // permissions don't apply to this action. For more information, see Grant
+ // cross-account permissions to a role and a user name
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name)
+ // in the Amazon SQS Developer Guide.
+ Tags map[string]string `mapstructure:"tags"`
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Endpoint == "" {
+ c.Endpoint = "http://localhost:9324"
+ }
+}
+
+func (c *Config) InitDefault() {
+ if c.Queue == "" {
+ c.Queue = "default"
+ }
+
+ if c.PrefetchCount == 0 || c.PrefetchCount > 10 {
+ c.PrefetchCount = 10
+ }
+
+ if c.WaitTimeSeconds == 0 {
+ c.WaitTimeSeconds = 5
+ }
+
+ if c.Attributes == nil {
+ c.Attributes = make(map[string]string)
+ }
+
+ if c.Tags == nil {
+ c.Tags = make(map[string]string)
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
new file mode 100644
index 00000000..cb7cb4af
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -0,0 +1,229 @@
+package sqs
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/aws/retry"
+ "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/credentials"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/google/uuid"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type JobConsumer struct {
+ sync.Mutex
+ pq priorityqueue.Queue
+ log logger.Logger
+ eh events.Handler
+ pipeline atomic.Value
+
+ // connection info
+ key string
+ secret string
+ sessionToken string
+ region string
+ endpoint string
+ queue string
+ messageGroupID string
+ waitTime int32
+ prefetch int32
+ visibilityTimeout int32
+
+ // if user invoke several resume operations
+ listeners uint32
+
+ // queue optional parameters
+ attributes map[string]string
+ tags map[string]string
+
+ client *sqs.Client
+ outputQ *sqs.CreateQueueOutput
+
+ pauseCh chan struct{}
+}
+
+func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_sqs_consumer")
+
+ // if no such key - error
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // initialize job consumer
+ jb := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ messageGroupID: uuid.NewString(),
+ attributes: pipeCfg.Attributes,
+ tags: pipeCfg.Tags,
+ queue: pipeCfg.Queue,
+ prefetch: pipeCfg.PrefetchCount,
+ visibilityTimeout: pipeCfg.VisibilityTimeout,
+ waitTime: pipeCfg.WaitTimeSeconds,
+ region: globalCfg.Region,
+ key: globalCfg.Key,
+ sessionToken: globalCfg.SessionToken,
+ secret: globalCfg.Secret,
+ endpoint: globalCfg.Endpoint,
+ }
+
+ // PARSE CONFIGURATION -------
+
+ awsConf, err := config.LoadDefaultConfig(context.Background(),
+ config.WithRegion(globalCfg.Region),
+ config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken)))
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // config with retries
+ jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) {
+ o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
+ opts.MaxAttempts = 60
+ })
+ })
+
+ jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags})
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return jb, nil
+}
+
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ return &JobConsumer{}, nil
+}
+
+func (j *JobConsumer) Push(jb *job.Job) error {
+ const op = errors.Op("sqs_push")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != jb.Options.Pipeline {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
+ }
+
+ // The length of time, in seconds, for which to delay a specific message. Valid
+ // values: 0 to 900. Maximum: 15 minutes.
+ if jb.Options.Delay > 900 {
+ return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
+ }
+
+ msg := fromJob(jb)
+
+ // The new value for the message's visibility timeout (in seconds). Values range: 0
+ // to 43200. Maximum: 12 hours.
+ _, err := j.client.SendMessage(context.Background(), msg.pack(j.outputQ.QueueUrl))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error {
+ j.pipeline.Store(pipeline)
+ return nil
+}
+
+func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_consume")
+
+ j.Lock()
+ defer j.Unlock()
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+
+ atomic.AddUint32(&j.listeners, 1)
+
+ // start listener
+ go j.listen()
+
+ return nil
+}
+
+func (j *JobConsumer) Stop() error {
+ j.pauseCh <- struct{}{}
+ return nil
+}
+
+func (j *JobConsumer) Pause(p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ // stop consume
+ j.pauseCh <- struct{}{}
+}
+
+func (j *JobConsumer) Resume(p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 1 {
+ j.log.Warn("sqs listener already in the active state")
+ return
+ }
+
+ // start listener
+ go j.listen()
+ atomic.AddUint32(&j.listeners, 1)
+}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
new file mode 100644
index 00000000..815b68c6
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -0,0 +1,227 @@
+package sqs
+
+import (
+ "context"
+ "strconv"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+const (
+ StringType string = "String"
+ NumberType string = "Number"
+ ApproximateReceiveCount string = "ApproximateReceiveCount"
+)
+
+var attributes = []string{
+ job.RRJob,
+ job.RRDelay,
+ job.RRTimeout,
+ job.RRPriority,
+ job.RRMaxAttempts,
+}
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int64 `json:"timeout,omitempty"`
+
+ // Maximum number of attempts to receive and process the message
+ MaxAttempts int64 `json:"max_attempts,omitempty"`
+
+ // Private ================
+ approxReceiveCount int64
+ queue *string
+ receiptHandler *string
+ client *sqs.Client
+}
+
+// CanRetry must return true if broker is allowed to re-run the job.
+func (o *Options) CanRetry(attempt int64) bool {
+ // Attempts 1 and 0 has identical effect
+ return o.MaxAttempts > (attempt + 1)
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+// Not used in the sqs, MessageAttributes used instead
+func (i *Item) Context() ([]byte, error) {
+ return nil, nil
+}
+
+func (i *Item) Ack() error {
+ _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Nack() error {
+ if i.Options.CanRetry(i.Options.approxReceiveCount) {
+ return nil
+ }
+
+ _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
+ MaxAttempts: job.Options.Attempts,
+ },
+ }
+}
+
+func (i *Item) pack(queue *string) *sqs.SendMessageInput {
+ return &sqs.SendMessageInput{
+ MessageBody: aws.String(i.Payload),
+ QueueUrl: queue,
+ DelaySeconds: int32(i.Options.Delay),
+ MessageAttributes: map[string]types.MessageAttributeValue{
+ job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
+ job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
+ job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))},
+ job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
+ job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.MaxAttempts)))},
+ },
+ }
+}
+
+func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, int, error) {
+ const op = errors.Op("sqs_unpack")
+ // reserved
+ if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
+ return nil, 0, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute"))
+ }
+
+ for i := 0; i < len(attributes); i++ {
+ if _, ok := msg.MessageAttributes[attributes[i]]; !ok {
+ return nil, 0, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i]))
+ }
+ }
+
+ attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ to, err := strconv.Atoi(*msg.MessageAttributes[job.RRTimeout].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue)
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount])
+ if err != nil {
+ return nil, 0, errors.E(op, err)
+ }
+
+ item := &Item{
+ Job: *msg.MessageAttributes[job.RRJob].StringValue,
+ Payload: *msg.Body,
+ Options: &Options{
+ Delay: int64(delay),
+ Timeout: int64(to),
+ Priority: int64(priority),
+ MaxAttempts: int64(attempt),
+
+ // private
+ approxReceiveCount: int64(recCount),
+ client: client,
+ queue: queue,
+ receiptHandler: msg.ReceiptHandle,
+ },
+ }
+
+ return item, recCount, nil
+}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
new file mode 100644
index 00000000..bb6f8c7a
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -0,0 +1,66 @@
+package sqs
+
+import (
+ "context"
+
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+)
+
+const (
+ All string = "All"
+)
+
+func (j *JobConsumer) listen() {
+ for {
+ select {
+ case <-j.pauseCh:
+ return
+ default:
+ message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ MaxNumberOfMessages: j.prefetch,
+ AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
+ MessageAttributeNames: []string{All},
+ VisibilityTimeout: j.visibilityTimeout,
+ WaitTimeSeconds: j.waitTime,
+ })
+ if err != nil {
+ j.log.Error("receive message", "error", err)
+ continue
+ }
+
+ for i := 0; i < len(message.Messages); i++ {
+ m := message.Messages[i]
+ item, attempt, err := unpack(&m, j.outputQ.QueueUrl, j.client)
+ if err != nil {
+ _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if errD != nil {
+ j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ continue
+ }
+
+ j.log.Error("message unpack", "error", err)
+ continue
+ }
+
+ if item.Options.CanRetry(int64(attempt)) {
+ j.pq.Insert(item)
+ continue
+ }
+
+ _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: j.outputQ.QueueUrl,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if errD != nil {
+ j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ continue
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/jobs/drivers/sqs/plugin.go
new file mode 100644
index 00000000..54f61ff5
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/plugin.go
@@ -0,0 +1,39 @@
+package sqs
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ pluginName string = "sqs"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewSQSConsumer(configKey, p.log, p.cfg, e, pq)
+}
+
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipe, p.log, p.cfg, e, pq)
+}
diff --git a/plugins/jobs/job/general.go b/plugins/jobs/job/general.go
new file mode 100644
index 00000000..2c7d04f0
--- /dev/null
+++ b/plugins/jobs/job/general.go
@@ -0,0 +1,31 @@
+package job
+
+// constant keys to pack/unpack messages from different drivers
+const (
+ RRID string = "rr_id"
+ RRJob string = "rr_job"
+ RRHeaders string = "rr_headers"
+ RRPipeline string = "rr_pipeline"
+ RRTimeout string = "rr_timeout"
+ RRDelay string = "rr_delay"
+ RRPriority string = "rr_priority"
+ RRMaxAttempts string = "rr_max_attempts"
+)
+
+// Job carries information about single job.
+type Job struct {
+ // Job contains name of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-value pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
diff --git a/plugins/jobs/job/job_options.go b/plugins/jobs/job/job_options.go
new file mode 100644
index 00000000..af971d15
--- /dev/null
+++ b/plugins/jobs/job/job_options.go
@@ -0,0 +1,74 @@
+package job
+
+import "time"
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
+ // Minimum valuable value is 2.
+ Attempts int64 `json:"maxAttempts,omitempty"`
+
+ // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
+ RetryDelay int64 `json:"retryDelay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int64 `json:"timeout,omitempty"`
+}
+
+// Merge merges job options.
+func (o *Options) Merge(from *Options) {
+ if o.Pipeline == "" {
+ o.Pipeline = from.Pipeline
+ }
+
+ if o.Attempts == 0 {
+ o.Attempts = from.Attempts
+ }
+
+ if o.Timeout == 0 {
+ o.Timeout = from.Timeout
+ }
+
+ if o.RetryDelay == 0 {
+ o.RetryDelay = from.RetryDelay
+ }
+
+ if o.Delay == 0 {
+ o.Delay = from.Delay
+ }
+}
+
+// CanRetry must return true if broker is allowed to re-run the job.
+func (o *Options) CanRetry(attempt int64) bool {
+ // Attempts 1 and 0 has identical effect
+ return o.Attempts > (attempt + 1)
+}
+
+// RetryDuration returns retry delay duration in a form of time.Duration.
+func (o *Options) RetryDuration() time.Duration {
+ return time.Second * time.Duration(o.RetryDelay)
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_options_test.go
new file mode 100644
index 00000000..f4b1dc0c
--- /dev/null
+++ b/plugins/jobs/job/job_options_test.go
@@ -0,0 +1,110 @@
+package job
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestOptions_CanRetry(t *testing.T) {
+ opts := &Options{Attempts: 0}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_SameValue(t *testing.T) {
+ opts := &Options{Attempts: 1}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_Value(t *testing.T) {
+ opts := &Options{Attempts: 2}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_CanRetry_Value3(t *testing.T) {
+ opts := &Options{Attempts: 3}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.True(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_RetryDuration(t *testing.T) {
+ opts := &Options{RetryDelay: 0}
+ assert.Equal(t, time.Duration(0), opts.RetryDuration())
+}
+
+func TestOptions_RetryDuration2(t *testing.T) {
+ opts := &Options{RetryDelay: 1}
+ assert.Equal(t, time.Second, opts.RetryDuration())
+}
+
+func TestOptions_DelayDuration(t *testing.T) {
+ opts := &Options{Delay: 0}
+ assert.Equal(t, time.Duration(0), opts.DelayDuration())
+}
+
+func TestOptions_DelayDuration2(t *testing.T) {
+ opts := &Options{Delay: 1}
+ assert.Equal(t, time.Second, opts.DelayDuration())
+}
+
+func TestOptions_TimeoutDuration(t *testing.T) {
+ opts := &Options{Timeout: 0}
+ assert.Equal(t, time.Minute*30, opts.TimeoutDuration())
+}
+
+func TestOptions_TimeoutDuration2(t *testing.T) {
+ opts := &Options{Timeout: 1}
+ assert.Equal(t, time.Second, opts.TimeoutDuration())
+}
+
+func TestOptions_Merge(t *testing.T) {
+ opts := &Options{}
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "pipeline", opts.Pipeline)
+ assert.Equal(t, int64(1), opts.Attempts)
+ assert.Equal(t, int64(2), opts.Delay)
+ assert.Equal(t, int64(1), opts.Timeout)
+ assert.Equal(t, int64(1), opts.RetryDelay)
+}
+
+func TestOptions_MergeKeepOriginal(t *testing.T) {
+ opts := &Options{
+ Pipeline: "default",
+ Delay: 10,
+ Timeout: 10,
+ Attempts: 10,
+ RetryDelay: 10,
+ }
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "default", opts.Pipeline)
+ assert.Equal(t, int64(10), opts.Attempts)
+ assert.Equal(t, int64(10), opts.Delay)
+ assert.Equal(t, int64(10), opts.Timeout)
+ assert.Equal(t, int64(10), opts.RetryDelay)
+}
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
new file mode 100644
index 00000000..90eeb189
--- /dev/null
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -0,0 +1,78 @@
+package pipeline
+
+// Pipeline defines pipeline options.
+type Pipeline map[string]interface{}
+
+const (
+ priority string = "priority"
+ driver string = "driver"
+ name string = "name"
+)
+
+// With pipeline value
+func (p *Pipeline) With(name string, value interface{}) {
+ (*p)[name] = value
+}
+
+// Name returns pipeline name.
+func (p Pipeline) Name() string {
+ return p.String(name, "")
+}
+
+// Driver associated with the pipeline.
+func (p Pipeline) Driver() string {
+ return p.String(driver, "")
+}
+
+// Has checks if value presented in pipeline.
+func (p Pipeline) Has(name string) bool {
+ if _, ok := p[name]; ok {
+ return true
+ }
+
+ return false
+}
+
+// String must return option value as string or return default value.
+func (p Pipeline) String(name string, d string) string {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(string); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Int must return option value as string or return default value.
+func (p Pipeline) Int(name string, d int) int {
+ if value, ok := p[name]; ok {
+ if i, ok := value.(int); ok {
+ return i
+ }
+ }
+
+ return d
+}
+
+// Bool must return option value as bool or return default value.
+func (p Pipeline) Bool(name string, d bool) bool {
+ if value, ok := p[name]; ok {
+ if i, ok := value.(bool); ok {
+ return i
+ }
+ }
+
+ return d
+}
+
+// Priority returns default pipeline priority
+func (p Pipeline) Priority() int64 {
+ if value, ok := p[priority]; ok {
+ if v, ok := value.(int64); ok {
+ return v
+ }
+ }
+
+ return 10
+}
diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go
new file mode 100644
index 00000000..4482c70d
--- /dev/null
+++ b/plugins/jobs/pipeline/pipeline_test.go
@@ -0,0 +1,21 @@
+package pipeline
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestPipeline_String(t *testing.T) {
+ pipe := Pipeline{"value": "value"}
+
+ assert.Equal(t, "value", pipe.String("value", ""))
+ assert.Equal(t, "value", pipe.String("other", "value"))
+}
+
+func TestPipeline_Has(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, true, pipe.Has("options"))
+ assert.Equal(t, false, pipe.Has("other"))
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
new file mode 100644
index 00000000..ce51df21
--- /dev/null
+++ b/plugins/jobs/plugin.go
@@ -0,0 +1,525 @@
+package jobs
+
+import (
+ "context"
+ "fmt"
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+)
+
+const (
+ // RrJobs env variable
+ RrJobs string = "rr_jobs"
+ PluginName string = "jobs"
+
+ pipelines string = "pipelines"
+)
+
+type Plugin struct {
+ cfg *Config `mapstructure:"jobs"`
+ log logger.Logger
+
+ sync.RWMutex
+
+ workersPool pool.Pool
+ server server.Server
+
+ jobConstructors map[string]jobs.Constructor
+ consumers map[string]jobs.Consumer
+
+ events events.Handler
+
+ // priority queue implementation
+ queue priorityqueue.Queue
+
+ // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline
+ pipelines sync.Map
+
+ // initial set of the pipelines to consume
+ consume map[string]struct{}
+
+ stopCh chan struct{}
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+ const op = errors.Op("jobs_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.cfg.InitDefaults()
+
+ p.server = server
+
+ p.events = events.NewEventsHandler()
+ p.events.AddListener(p.collectJobsEvents)
+
+ p.jobConstructors = make(map[string]jobs.Constructor)
+ p.consumers = make(map[string]jobs.Consumer)
+ p.consume = make(map[string]struct{})
+ p.stopCh = make(chan struct{}, 1)
+
+ // initial set of pipelines
+ for i := range p.cfg.Pipelines {
+ p.pipelines.Store(i, p.cfg.Pipelines[i])
+ }
+
+ if len(p.cfg.Consume) > 0 {
+ for i := 0; i < len(p.cfg.Consume); i++ {
+ p.consume[p.cfg.Consume[i]] = struct{}{}
+ }
+ }
+
+ // initialize priority queue
+ p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize)
+ p.log = log
+
+ return nil
+}
+
+func (p *Plugin) Serve() chan error { //nolint:gocognit
+ errCh := make(chan error, 1)
+ const op = errors.Op("jobs_plugin_serve")
+
+ // register initial pipelines
+ p.pipelines.Range(func(key, value interface{}) bool {
+ t := time.Now()
+ // pipeline name (ie test-local, sqs-aws, etc)
+ name := key.(string)
+
+ // pipeline associated with the name
+ pipe := value.(*pipeline.Pipeline)
+ // driver for the pipeline (ie amqp, ephemeral, etc)
+ dr := pipe.Driver()
+
+ // jobConstructors contains constructors for the drivers
+ // we need here to initialize these drivers for the pipelines
+ if c, ok := p.jobConstructors[dr]; ok {
+ // config key for the particular sub-driver jobs.pipelines.test-local
+ configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name)
+
+ // init the driver
+ initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return false
+ }
+
+ // add driver to the set of the consumers (name - pipeline name, value - associated driver)
+ p.consumers[name] = initializedDriver
+
+ // register pipeline for the initialized driver
+ err = initializedDriver.Register(pipe)
+ if err != nil {
+ errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name()))
+ return false
+ }
+
+ // if pipeline initialized to be consumed, call Run on it
+ if _, ok := p.consume[name]; ok {
+ err = initializedDriver.Run(pipe)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return false
+ }
+
+ p.events.Push(events.JobEvent{
+ Event: events.EventPipeRun,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Start: t,
+ Elapsed: t.Sub(t),
+ })
+
+ return true
+ }
+
+ return true
+ }
+ p.events.Push(events.JobEvent{
+ Event: events.EventDriverReady,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Start: t,
+ Elapsed: t.Sub(t),
+ })
+
+ return true
+ })
+
+ var err error
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"})
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+ var rate uint64
+ go func() {
+ tt := time.NewTicker(time.Second * 1)
+ for { //nolint:gosimple
+ select {
+ case <-tt.C:
+ fmt.Printf("---> rate is: %d\n", atomic.LoadUint64(&rate))
+ fmt.Printf("---> goroutines: %d\n", runtime.NumGoroutine())
+ fmt.Printf("---> curr len: %d\n", p.queue.Len())
+ atomic.StoreUint64(&rate, 0)
+ }
+ }
+ }()
+
+ // THIS IS TEST HELPERS, SHOULD BE DELETED IN THE RELEASES !!!!!!!!!!!!!!!!!!!!!!!! <-----------------------------------------------------
+
+ // start listening
+ go func() {
+ for i := uint8(0); i < p.cfg.NumPollers; i++ {
+ go func() {
+ for {
+ select {
+ case <-p.stopCh:
+ p.log.Debug("------> job poller stopped <------")
+ return
+ default:
+ // get data JOB from the queue
+ job := p.queue.ExtractMin()
+
+ ctx, err := job.Context()
+ if err != nil {
+ errNack := job.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed", "error", errNack)
+ }
+ p.log.Error("job marshal context", "error", err)
+ continue
+ }
+
+ exec := payload.Payload{
+ Context: ctx,
+ Body: job.Body(),
+ }
+
+ // protect from the pool reset
+ p.RLock()
+ _, err = p.workersPool.Exec(exec)
+ if err != nil {
+ errNack := job.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed", "error", errNack)
+ }
+
+ p.RUnlock()
+ p.log.Error("job execute", "error", err)
+ continue
+ }
+ p.RUnlock()
+
+ errAck := job.Ack()
+ if errAck != nil {
+ p.log.Error("acknowledge failed", "error", errAck)
+ }
+ // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
+ atomic.AddUint64(&rate, 1)
+ }
+ }
+ }()
+ }
+ }()
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ for k, v := range p.consumers {
+ err := v.Stop()
+ if err != nil {
+ p.log.Error("stop job driver", "driver", k)
+ continue
+ }
+ }
+
+ // this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
+ // but if not, this is not a problem at all.
+ // The main target is to stop the drivers
+ go func() {
+ for i := uint8(0); i < p.cfg.NumPollers; i++ {
+ // stop jobs plugin pollers
+ p.stopCh <- struct{}{}
+ }
+ }()
+
+ // just wait pollers for 5 seconds before exit
+ time.Sleep(time.Second * 5)
+
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectMQBrokers,
+ }
+}
+
+func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) {
+ p.jobConstructors[name.Name()] = c
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Reset() error {
+ p.Lock()
+ defer p.Unlock()
+
+ const op = errors.Op("jobs_plugin_reset")
+ p.log.Info("JOBS plugin got restart request. Restarting...")
+ p.workersPool.Destroy(context.Background())
+ p.workersPool = nil
+
+ var err error
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrJobs: "true"}, p.collectJobsEvents)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.log.Info("JOBS workers pool successfully restarted")
+
+ return nil
+}
+
+func (p *Plugin) Push(j *job.Job) error {
+ const op = errors.Op("jobs_plugin_push")
+
+ // get the pipeline for the job
+ pipe, ok := p.pipelines.Load(j.Options.Pipeline)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j.Options.Pipeline))
+ }
+
+ // type conversion
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
+ }
+
+ // if job has no priority, inherit it from the pipeline
+ // TODO merge all options, not only priority
+ if j.Options.Priority == 0 {
+ j.Options.Priority = ppl.Priority()
+ }
+
+ err := d.Push(j)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (p *Plugin) PushBatch(j []*job.Job) error {
+ const op = errors.Op("jobs_plugin_push")
+
+ for i := 0; i < len(j); i++ {
+ // get the pipeline for the job
+ pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j[i].Options.Pipeline))
+ }
+
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
+ }
+
+ // if job has no priority, inherit it from the pipeline
+ if j[i].Options.Priority == 0 {
+ j[i].Options.Priority = ppl.Priority()
+ }
+
+ err := d.Push(j[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}
+
+func (p *Plugin) Pause(pipelines []string) {
+ for i := 0; i < len(pipelines); i++ {
+ pipe, ok := p.pipelines.Load(pipelines[i])
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pipelines[i])
+ }
+
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i])
+ return
+ }
+
+ // redirect call to the underlying driver
+ d.Pause(ppl.Name())
+ }
+}
+
+func (p *Plugin) Resume(pipelines []string) {
+ for i := 0; i < len(pipelines); i++ {
+ pipe, ok := p.pipelines.Load(pipelines[i])
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pipelines[i])
+ }
+
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pipelines[i])
+ return
+ }
+
+ // redirect call to the underlying driver
+ d.Resume(ppl.Name())
+ }
+}
+
+// Declare a pipeline.
+func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("jobs_plugin_declare")
+ // driver for the pipeline (ie amqp, ephemeral, etc)
+ dr := pipeline.Driver()
+ if dr == "" {
+ return errors.E(op, errors.Errorf("no associated driver with the pipeline, pipeline name: %s", pipeline.Name()))
+ }
+
+ // jobConstructors contains constructors for the drivers
+ // we need here to initialize these drivers for the pipelines
+ if c, ok := p.jobConstructors[dr]; ok {
+ // init the driver from pipeline
+ initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // add driver to the set of the consumers (name - pipeline name, value - associated driver)
+ p.consumers[pipeline.Name()] = initializedDriver
+
+ // register pipeline for the initialized driver
+ err = initializedDriver.Register(pipeline)
+ if err != nil {
+ return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name()))
+ }
+
+ // if pipeline initialized to be consumed, call Run on it
+ if _, ok := p.consume[pipeline.Name()]; ok {
+ err = initializedDriver.Run(pipeline)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ }
+
+ p.pipelines.Store(pipeline.Name(), pipeline)
+
+ return nil
+}
+
+// Destroy pipeline and release all associated resources.
+func (p *Plugin) Destroy(pp string) error {
+ const op = errors.Op("jobs_plugin_destroy")
+ pipe, ok := p.pipelines.Load(pp)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline, requested: %s", pp))
+ }
+
+ // type conversion
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
+ }
+
+ // delete consumer
+ delete(p.consumers, ppl.Name())
+ p.pipelines.Delete(pp)
+
+ return d.Stop()
+}
+
+func (p *Plugin) List() []string {
+ out := make([]string, 0, 10)
+
+ p.pipelines.Range(func(key, _ interface{}) bool {
+ // we can safely convert value here as we know that we store keys as strings
+ out = append(out, key.(string))
+ return true
+ })
+
+ return out
+}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ log: p.log,
+ p: p,
+ }
+}
+
+func (p *Plugin) collectJobsEvents(event interface{}) {
+ if jev, ok := event.(events.JobEvent); ok {
+ switch jev.Event {
+ case events.EventJobStart:
+ p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventJobOK:
+ p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPushOK:
+ p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPushError:
+ p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventJobError:
+ p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPipeRun:
+ p.log.Info("pipeline started", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPipeActive:
+ p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPipeStopped:
+ p.log.Warn("pipeline stopped", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPipeError:
+ p.log.Error("pipeline error", "pipeline", jev.Pipeline, "error", jev.Error, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventDriverReady:
+ p.log.Info("driver ready", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventInitialized:
+ p.log.Info("driver initialized", "driver", jev.Driver, "start", jev.Start.UTC())
+ }
+ }
+}
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
new file mode 100644
index 00000000..a2bd9c6d
--- /dev/null
+++ b/plugins/jobs/rpc.go
@@ -0,0 +1,119 @@
+package jobs
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
+)
+
+type rpc struct {
+ log logger.Logger
+ p *Plugin
+}
+
+/*
+List of the RPC methods:
+1. Push - single job push
+2. PushBatch - push job batch
+
+3. Reset - managed by the Resetter plugin
+
+4. Pause - pauses set of pipelines
+5. Resume - resumes set of pipelines
+
+6. Workers - managed by the Informer plugin.
+7. Stat - jobs statistic
+*/
+
+func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
+ const op = errors.Op("jobs_rpc_push")
+
+ // convert transport entity into domain
+ // how we can do this quickly
+
+ if j.GetJob().GetId() == "" {
+ return errors.E(op, errors.Str("empty ID field not allowed"))
+ }
+
+ err := r.p.Push(r.from(j.GetJob()))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error {
+ const op = errors.Op("jobs_rpc_push")
+
+ l := len(j.GetJobs())
+
+ batch := make([]*job.Job, l)
+
+ for i := 0; i < l; i++ {
+ // convert transport entity into domain
+ // how we can do this quickly
+ batch[i] = r.from(j.GetJobs()[i])
+ }
+
+ err := r.p.PushBatch(batch)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (r *rpc) Pause(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error {
+ pipelines := make([]string, len(req.GetPipelines()))
+
+ for i := 0; i < len(pipelines); i++ {
+ pipelines[i] = req.GetPipelines()[i]
+ }
+
+ r.p.Pause(pipelines)
+ return nil
+}
+
+func (r *rpc) Resume(req *jobsv1beta.MaintenanceRequest, _ *jobsv1beta.Empty) error {
+ pipelines := make([]string, len(req.GetPipelines()))
+
+ for i := 0; i < len(pipelines); i++ {
+ pipelines[i] = req.GetPipelines()[i]
+ }
+
+ r.p.Resume(pipelines)
+ return nil
+}
+
+func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.List) error {
+ resp.Pipelines = r.p.List()
+ return nil
+}
+
+// from converts from transport entity to domain
+func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
+ headers := map[string][]string{}
+
+ for k, v := range j.GetHeaders() {
+ headers[k] = v.GetValue()
+ }
+
+ jb := &job.Job{
+ Job: j.GetJob(),
+ Headers: headers,
+ Ident: j.GetId(),
+ Payload: j.GetPayload(),
+ Options: &job.Options{
+ Priority: j.GetOptions().GetPriority(),
+ Pipeline: j.GetOptions().GetPipeline(),
+ Delay: j.GetOptions().GetDelay(),
+ Attempts: j.GetOptions().GetAttempts(),
+ RetryDelay: j.GetOptions().GetRetryDelay(),
+ Timeout: j.GetOptions().GetTimeout(),
+ },
+ }
+
+ return jb
+}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 47d37cc2..15a5674f 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -10,7 +10,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
@@ -34,7 +33,7 @@ type Driver struct {
stop chan struct{}
}
-func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (kv.Storage, error) {
+func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
const op = errors.Op("new_boltdb_driver")
d := &Driver{
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
index 6ae1a1f6..c839130f 100644
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -2,12 +2,15 @@ package boltdb
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-const PluginName = "boltdb"
+const (
+ PluginName string = "boltdb"
+ RootPluginName string = "kv"
+)
// Plugin BoltDB K/V storage.
type Plugin struct {
@@ -21,7 +24,7 @@ type Plugin struct {
}
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(kv.PluginName) {
+ if !cfg.Has(RootPluginName) {
return errors.E(errors.Disabled)
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 14e7c078..e24747fe 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -7,7 +7,6 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
@@ -21,7 +20,7 @@ type Driver struct {
// NewMemcachedDriver returns a memcache client using the provided server(s)
// with equal weight. If a server is listed multiple times,
// it gets a proportional amount of weight.
-func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) {
+func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) {
const op = errors.Op("new_memcached_driver")
s := &Driver{
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index 22ea5cca..59a2b7cb 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -2,12 +2,15 @@ package memcached
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-const PluginName = "memcached"
+const (
+ PluginName string = "memcached"
+ RootPluginName string = "kv"
+)
type Plugin struct {
// config plugin
@@ -17,7 +20,7 @@ type Plugin struct {
}
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(kv.PluginName) {
+ if !cfg.Has(RootPluginName) {
return errors.E(errors.Disabled)
}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index 03dbaed6..e9ea25df 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -5,10 +5,12 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
+// PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync.
const PluginName string = "kv"
const (
@@ -25,9 +27,9 @@ const (
type Plugin struct {
log logger.Logger
// constructors contains general storage constructors, such as boltdb, memory, memcached, redis.
- constructors map[string]Constructor
+ constructors map[string]kv.Constructor
// storages contains user-defined storages, such as boltdb-north, memcached-us and so on.
- storages map[string]Storage
+ storages map[string]kv.Storage
// KV configuration
cfg Config
cfgPlugin config.Configurer
@@ -43,8 +45,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if err != nil {
return errors.E(op, err)
}
- p.constructors = make(map[string]Constructor, 5)
- p.storages = make(map[string]Storage, 5)
+ p.constructors = make(map[string]kv.Constructor, 5)
+ p.storages = make(map[string]kv.Storage, 5)
p.log = log
p.cfgPlugin = cfg
return nil
@@ -203,7 +205,7 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) {
+func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor kv.Constructor) {
// save the storage constructor
p.constructors[name.Name()] = constructor
}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 3f7ba97c..ad4aefa9 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,6 +2,7 @@ package kv
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
@@ -9,7 +10,7 @@ import (
// Wrapper for the plugin
type rpc struct {
// all available storages
- storages map[string]Storage
+ storages map[string]kv.Storage
// svc is a plugin implementing Storage interface
srv *Plugin
// Logger
diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go
index c13c2314..68ea7266 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/kv.go
@@ -7,7 +7,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
@@ -21,7 +20,7 @@ type Driver struct {
cfg *Config
}
-func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (kv.Storage, error) {
+func NewInMemoryDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) {
const op = errors.Op("new_in_memory_driver")
d := &Driver{
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 70badf15..7d418a70 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,9 +2,9 @@ package memory
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -21,7 +21,6 @@ type Plugin struct {
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
- p.log = log
p.cfgPlugin = cfg
p.stop = make(chan struct{}, 1)
return nil
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index d027a8a5..c79f3eb0 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -3,8 +3,8 @@ package memory
import (
"sync"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/bst"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -17,7 +17,7 @@ type PubSubDriver struct {
log logger.Logger
}
-func NewPubSubDriver(log logger.Logger, _ string) (pubsub.PubSub, error) {
+func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) {
ps := &PubSubDriver{
pushCh: make(chan *pubsub.Message, 10),
storage: bst.NewBST(),
diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go
index 5817853c..0cd62d19 100644
--- a/plugins/redis/channel.go
+++ b/plugins/redis/channel.go
@@ -6,7 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go
index 2e4b9bfd..29f89d46 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv.go
@@ -8,7 +8,6 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
@@ -20,7 +19,7 @@ type Driver struct {
cfg *Config
}
-func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) {
+func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) {
const op = errors.Op("new_boltdb_driver")
d := &Driver{
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 9d98790b..3c62a63f 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -5,9 +5,9 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index 4e41acb5..01efc623 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -6,7 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -21,7 +21,7 @@ type PubSubDriver struct {
stopCh chan struct{}
}
-func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stopCh chan struct{}) (pubsub.PubSub, error) {
+func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stopCh chan struct{}) (*PubSubDriver, error) {
const op = errors.Op("new_pub_sub_driver")
ps := &PubSubDriver{
log: log,
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
index 0424d52d..b0f84a7f 100644
--- a/plugins/server/interface.go
+++ b/plugins/server/interface.go
@@ -19,5 +19,5 @@ type Server interface {
// NewWorker return a new worker with provided and attached by the user listeners and environment variables
NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error)
// NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration
- NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
+ NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 00639f43..1694cdf1 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -21,14 +21,14 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-// PluginName for the server
-const PluginName = "server"
-
-// RrRelay env variable key (internal)
-const RrRelay = "RR_RELAY"
-
-// RrRPC env variable key (internal) if the RPC presents
-const RrRPC = "RR_RPC"
+const (
+ // PluginName for the server
+ PluginName = "server"
+ // RrRelay env variable key (internal)
+ RrRelay = "RR_RELAY"
+ // RrRPC env variable key (internal) if the RPC presents
+ RrRPC = "RR_RPC"
+)
// Plugin manages worker
type Plugin struct {
@@ -124,7 +124,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event
const op = errors.Op("server_plugin_new_worker")
list := make([]events.Listener, 0, len(listeners))
- list = append(list, server.collectWorkerLogs)
+ list = append(list, server.collectWorkerEvents)
spawnCmd, err := server.CmdFactory(env)
if err != nil {
@@ -140,15 +140,16 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event
}
// NewWorkerPool issues new worker pool.
-func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) {
+func (server *Plugin) NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) {
const op = errors.Op("server_plugin_new_worker_pool")
+
spawnCmd, err := server.CmdFactory(env)
if err != nil {
return nil, errors.E(op, err)
}
- list := make([]events.Listener, 0, 1)
- list = append(list, server.collectEvents)
+ list := make([]events.Listener, 0, 2)
+ list = append(list, server.collectPoolEvents, server.collectWorkerEvents)
if len(listeners) != 0 {
list = append(list, listeners...)
}
@@ -209,7 +210,7 @@ func (server *Plugin) setEnv(e Env) []string {
return env
}
-func (server *Plugin) collectEvents(event interface{}) {
+func (server *Plugin) collectPoolEvents(event interface{}) {
if we, ok := event.(events.PoolEvent); ok {
switch we.Event {
case events.EventMaxMemory:
@@ -238,7 +239,9 @@ func (server *Plugin) collectEvents(event interface{}) {
server.log.Warn("requested pool restart")
}
}
+}
+func (server *Plugin) collectWorkerEvents(event interface{}) {
if we, ok := event.(events.WorkerEvent); ok {
switch we.Event {
case events.EventWorkerError:
@@ -263,17 +266,3 @@ func (server *Plugin) collectEvents(event interface{}) {
}
}
}
-
-func (server *Plugin) collectWorkerLogs(event interface{}) {
- if we, ok := event.(events.WorkerEvent); ok {
- switch we.Event {
- case events.EventWorkerError:
- server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t"))
- case events.EventWorkerLog:
- server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
- // stderr event is INFO level
- case events.EventWorkerStderr:
- server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
- }
- }
-}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 664b4dfd..c1f79a78 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -7,7 +7,7 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 1115bd10..6c119e57 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -10,10 +10,10 @@ import (
"github.com/google/uuid"
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -109,7 +109,7 @@ func (p *Plugin) Serve() chan error {
p.Lock()
defer p.Unlock()
- p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
+ p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
@@ -243,13 +243,13 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
}
// Workers returns slice with the process states for the workers
-func (p *Plugin) Workers() []process.State {
+func (p *Plugin) Workers() []*process.State {
p.RLock()
defer p.RUnlock()
workers := p.workers()
- ps := make([]process.State, 0, len(workers))
+ ps := make([]*process.State, 0, len(workers))
for i := 0; i < len(workers); i++ {
state, err := process.WorkerProcessState(workers[i])
if err != nil {
@@ -276,7 +276,7 @@ func (p *Plugin) Reset() error {
p.phpPool = nil
var err error
- p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
+ p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
MaxJobs: p.cfg.Pool.MaxJobs,
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 752ba3ce..758620f6 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,7 +4,7 @@ import (
"sync"
json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/utils"
diff --git a/plugins/websockets/validator/access_validator.go b/plugins/websockets/validator/access_validator.go
index e666f846..2685da7f 100644
--- a/plugins/websockets/validator/access_validator.go
+++ b/plugins/websockets/validator/access_validator.go
@@ -12,6 +12,11 @@ import (
type AccessValidatorFn = func(r *http.Request, channels ...string) (*AccessValidator, error)
+const (
+ joinServer string = "ws:joinServer"
+ joinTopics string = "ws:joinTopics"
+)
+
type AccessValidator struct {
Header http.Header `json:"headers"`
Status int `json:"status"`
@@ -26,7 +31,7 @@ func ServerAccessValidator(r *http.Request) ([]byte, error) {
return nil, errors.E(op, err)
}
- defer delete(attributes.All(r), "ws:joinServer")
+ defer delete(attributes.All(r), joinServer)
req := &handler.Request{
RemoteAddr: handler.FetchIP(r.RemoteAddr),
@@ -54,7 +59,7 @@ func TopicsAccessValidator(r *http.Request, topics ...string) ([]byte, error) {
return nil, errors.E(op, err)
}
- defer delete(attributes.All(r), "ws:joinTopics")
+ defer delete(attributes.All(r), joinTopics)
req := &handler.Request{
RemoteAddr: handler.FetchIP(r.RemoteAddr),
diff --git a/proto/jobs/v1beta/jobs.pb.go b/proto/jobs/v1beta/jobs.pb.go
new file mode 100644
index 00000000..b445ca3f
--- /dev/null
+++ b/proto/jobs/v1beta/jobs.pb.go
@@ -0,0 +1,676 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.27.1
+// protoc v3.17.3
+// source: jobs.proto
+
+package jobsv1beta
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// single job request
+type PushRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Job *Job `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"`
+}
+
+func (x *PushRequest) Reset() {
+ *x = PushRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *PushRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*PushRequest) ProtoMessage() {}
+
+func (x *PushRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use PushRequest.ProtoReflect.Descriptor instead.
+func (*PushRequest) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *PushRequest) GetJob() *Job {
+ if x != nil {
+ return x.Job
+ }
+ return nil
+}
+
+// batch jobs request
+type PushBatchRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Jobs []*Job `protobuf:"bytes,1,rep,name=jobs,proto3" json:"jobs,omitempty"`
+}
+
+func (x *PushBatchRequest) Reset() {
+ *x = PushBatchRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *PushBatchRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*PushBatchRequest) ProtoMessage() {}
+
+func (x *PushBatchRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use PushBatchRequest.ProtoReflect.Descriptor instead.
+func (*PushBatchRequest) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *PushBatchRequest) GetJobs() []*Job {
+ if x != nil {
+ return x.Jobs
+ }
+ return nil
+}
+
+// request to pause/resume
+type MaintenanceRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Pipelines []string `protobuf:"bytes,1,rep,name=pipelines,proto3" json:"pipelines,omitempty"`
+}
+
+func (x *MaintenanceRequest) Reset() {
+ *x = MaintenanceRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *MaintenanceRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*MaintenanceRequest) ProtoMessage() {}
+
+func (x *MaintenanceRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use MaintenanceRequest.ProtoReflect.Descriptor instead.
+func (*MaintenanceRequest) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *MaintenanceRequest) GetPipelines() []string {
+ if x != nil {
+ return x.Pipelines
+ }
+ return nil
+}
+
+// some endpoints receives nothing
+// all endpoints returns nothing
+type Empty struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+}
+
+func (x *Empty) Reset() {
+ *x = Empty{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Empty) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Empty) ProtoMessage() {}
+
+func (x *Empty) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Empty.ProtoReflect.Descriptor instead.
+func (*Empty) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{3}
+}
+
+type List struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Pipelines []string `protobuf:"bytes,1,rep,name=pipelines,proto3" json:"pipelines,omitempty"`
+}
+
+func (x *List) Reset() {
+ *x = List{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[4]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *List) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*List) ProtoMessage() {}
+
+func (x *List) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[4]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use List.ProtoReflect.Descriptor instead.
+func (*List) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{4}
+}
+
+func (x *List) GetPipelines() []string {
+ if x != nil {
+ return x.Pipelines
+ }
+ return nil
+}
+
+type Job struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Job string `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"`
+ Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"`
+ Payload string `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"`
+ Headers map[string]*HeaderValue `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
+ Options *Options `protobuf:"bytes,4,opt,name=options,proto3" json:"options,omitempty"`
+}
+
+func (x *Job) Reset() {
+ *x = Job{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[5]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Job) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Job) ProtoMessage() {}
+
+func (x *Job) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[5]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Job.ProtoReflect.Descriptor instead.
+func (*Job) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{5}
+}
+
+func (x *Job) GetJob() string {
+ if x != nil {
+ return x.Job
+ }
+ return ""
+}
+
+func (x *Job) GetId() string {
+ if x != nil {
+ return x.Id
+ }
+ return ""
+}
+
+func (x *Job) GetPayload() string {
+ if x != nil {
+ return x.Payload
+ }
+ return ""
+}
+
+func (x *Job) GetHeaders() map[string]*HeaderValue {
+ if x != nil {
+ return x.Headers
+ }
+ return nil
+}
+
+func (x *Job) GetOptions() *Options {
+ if x != nil {
+ return x.Options
+ }
+ return nil
+}
+
+type HeaderValue struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Value []string `protobuf:"bytes,1,rep,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *HeaderValue) Reset() {
+ *x = HeaderValue{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[6]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *HeaderValue) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*HeaderValue) ProtoMessage() {}
+
+func (x *HeaderValue) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[6]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use HeaderValue.ProtoReflect.Descriptor instead.
+func (*HeaderValue) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{6}
+}
+
+func (x *HeaderValue) GetValue() []string {
+ if x != nil {
+ return x.Value
+ }
+ return nil
+}
+
+type Options struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Priority int64 `protobuf:"varint,1,opt,name=priority,proto3" json:"priority,omitempty"`
+ Pipeline string `protobuf:"bytes,2,opt,name=pipeline,proto3" json:"pipeline,omitempty"`
+ Delay int64 `protobuf:"varint,3,opt,name=delay,proto3" json:"delay,omitempty"`
+ Attempts int64 `protobuf:"varint,4,opt,name=attempts,proto3" json:"attempts,omitempty"`
+ RetryDelay int64 `protobuf:"varint,5,opt,name=retry_delay,json=retryDelay,proto3" json:"retry_delay,omitempty"`
+ Timeout int64 `protobuf:"varint,6,opt,name=timeout,proto3" json:"timeout,omitempty"`
+}
+
+func (x *Options) Reset() {
+ *x = Options{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_jobs_proto_msgTypes[7]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Options) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Options) ProtoMessage() {}
+
+func (x *Options) ProtoReflect() protoreflect.Message {
+ mi := &file_jobs_proto_msgTypes[7]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Options.ProtoReflect.Descriptor instead.
+func (*Options) Descriptor() ([]byte, []int) {
+ return file_jobs_proto_rawDescGZIP(), []int{7}
+}
+
+func (x *Options) GetPriority() int64 {
+ if x != nil {
+ return x.Priority
+ }
+ return 0
+}
+
+func (x *Options) GetPipeline() string {
+ if x != nil {
+ return x.Pipeline
+ }
+ return ""
+}
+
+func (x *Options) GetDelay() int64 {
+ if x != nil {
+ return x.Delay
+ }
+ return 0
+}
+
+func (x *Options) GetAttempts() int64 {
+ if x != nil {
+ return x.Attempts
+ }
+ return 0
+}
+
+func (x *Options) GetRetryDelay() int64 {
+ if x != nil {
+ return x.RetryDelay
+ }
+ return 0
+}
+
+func (x *Options) GetTimeout() int64 {
+ if x != nil {
+ return x.Timeout
+ }
+ return 0
+}
+
+var File_jobs_proto protoreflect.FileDescriptor
+
+var file_jobs_proto_rawDesc = []byte{
+ 0x0a, 0x0a, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x6a, 0x6f,
+ 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x31, 0x0a, 0x0b, 0x50, 0x75, 0x73,
+ 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x22, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62,
+ 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x22, 0x38, 0x0a, 0x10,
+ 0x50, 0x75, 0x73, 0x68, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+ 0x12, 0x24, 0x0a, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10,
+ 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62,
+ 0x52, 0x04, 0x6a, 0x6f, 0x62, 0x73, 0x22, 0x32, 0x0a, 0x12, 0x4d, 0x61, 0x69, 0x6e, 0x74, 0x65,
+ 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09,
+ 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52,
+ 0x09, 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d,
+ 0x70, 0x74, 0x79, 0x22, 0x24, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x70,
+ 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09,
+ 0x70, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x22, 0x80, 0x02, 0x0a, 0x03, 0x4a, 0x6f,
+ 0x62, 0x12, 0x10, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03,
+ 0x6a, 0x6f, 0x62, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
+ 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x37, 0x0a,
+ 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d,
+ 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4a, 0x6f, 0x62,
+ 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68,
+ 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x2e, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e,
+ 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f,
+ 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x54, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72,
+ 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20,
+ 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
+ 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6a, 0x6f, 0x62, 0x73, 0x2e, 0x76,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75,
+ 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x23, 0x0a, 0x0b,
+ 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76,
+ 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75,
+ 0x65, 0x22, 0xae, 0x01, 0x0a, 0x07, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x0a,
+ 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52,
+ 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x69, 0x70,
+ 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x70,
+ 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x03,
+ 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x1a, 0x0a, 0x08, 0x61,
+ 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x61,
+ 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x72, 0x79,
+ 0x5f, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65,
+ 0x74, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65,
+ 0x6f, 0x75, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f,
+ 0x75, 0x74, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x6a, 0x6f, 0x62, 0x73, 0x76, 0x31, 0x62,
+ 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_jobs_proto_rawDescOnce sync.Once
+ file_jobs_proto_rawDescData = file_jobs_proto_rawDesc
+)
+
+func file_jobs_proto_rawDescGZIP() []byte {
+ file_jobs_proto_rawDescOnce.Do(func() {
+ file_jobs_proto_rawDescData = protoimpl.X.CompressGZIP(file_jobs_proto_rawDescData)
+ })
+ return file_jobs_proto_rawDescData
+}
+
+var file_jobs_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
+var file_jobs_proto_goTypes = []interface{}{
+ (*PushRequest)(nil), // 0: jobs.v1beta.PushRequest
+ (*PushBatchRequest)(nil), // 1: jobs.v1beta.PushBatchRequest
+ (*MaintenanceRequest)(nil), // 2: jobs.v1beta.MaintenanceRequest
+ (*Empty)(nil), // 3: jobs.v1beta.Empty
+ (*List)(nil), // 4: jobs.v1beta.List
+ (*Job)(nil), // 5: jobs.v1beta.Job
+ (*HeaderValue)(nil), // 6: jobs.v1beta.HeaderValue
+ (*Options)(nil), // 7: jobs.v1beta.Options
+ nil, // 8: jobs.v1beta.Job.HeadersEntry
+}
+var file_jobs_proto_depIdxs = []int32{
+ 5, // 0: jobs.v1beta.PushRequest.job:type_name -> jobs.v1beta.Job
+ 5, // 1: jobs.v1beta.PushBatchRequest.jobs:type_name -> jobs.v1beta.Job
+ 8, // 2: jobs.v1beta.Job.headers:type_name -> jobs.v1beta.Job.HeadersEntry
+ 7, // 3: jobs.v1beta.Job.options:type_name -> jobs.v1beta.Options
+ 6, // 4: jobs.v1beta.Job.HeadersEntry.value:type_name -> jobs.v1beta.HeaderValue
+ 5, // [5:5] is the sub-list for method output_type
+ 5, // [5:5] is the sub-list for method input_type
+ 5, // [5:5] is the sub-list for extension type_name
+ 5, // [5:5] is the sub-list for extension extendee
+ 0, // [0:5] is the sub-list for field type_name
+}
+
+func init() { file_jobs_proto_init() }
+func file_jobs_proto_init() {
+ if File_jobs_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_jobs_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*PushRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*PushBatchRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*MaintenanceRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Empty); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*List); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Job); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*HeaderValue); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_jobs_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Options); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_jobs_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 9,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_jobs_proto_goTypes,
+ DependencyIndexes: file_jobs_proto_depIdxs,
+ MessageInfos: file_jobs_proto_msgTypes,
+ }.Build()
+ File_jobs_proto = out.File
+ file_jobs_proto_rawDesc = nil
+ file_jobs_proto_goTypes = nil
+ file_jobs_proto_depIdxs = nil
+}
diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto
new file mode 100644
index 00000000..1bcddf4f
--- /dev/null
+++ b/proto/jobs/v1beta/jobs.proto
@@ -0,0 +1,49 @@
+syntax = "proto3";
+
+package jobs.v1beta;
+option go_package = "./;jobsv1beta";
+
+// single job request
+message PushRequest {
+ Job job = 1;
+}
+
+// batch jobs request
+message PushBatchRequest {
+ repeated Job jobs = 1;
+}
+
+// request to pause/resume
+message MaintenanceRequest {
+ repeated string pipelines = 1;
+}
+
+// some endpoints receives nothing
+// all endpoints returns nothing
+message Empty {}
+
+message List {
+ repeated string pipelines = 1;
+}
+
+message Job {
+ string job = 1;
+ string id = 2;
+ string payload = 3;
+ map<string, HeaderValue> headers = 5;
+ Options options = 4;
+}
+
+message HeaderValue {
+ repeated string value = 1;
+}
+
+message Options {
+ int64 priority = 1;
+ string pipeline = 2;
+ int64 delay = 3;
+ int64 attempts = 4;
+ int64 retry_delay = 5;
+ int64 timeout = 6;
+}
+
diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go
index 622967b8..1e38fe12 100644
--- a/proto/kv/v1beta/kv.pb.go
+++ b/proto/kv/v1beta/kv.pb.go
@@ -1,16 +1,17 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.16.0
+// protoc-gen-go v1.27.1
+// protoc v3.17.3
// source: kv.proto
package kvv1beta
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go
index ad4ebbe7..b07c271e 100644
--- a/proto/websockets/v1beta/websockets.pb.go
+++ b/proto/websockets/v1beta/websockets.pb.go
@@ -1,16 +1,17 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.16.0
+// protoc-gen-go v1.27.1
+// protoc v3.17.3
// source: websockets.proto
package websocketsv1beta
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
diff --git a/tests/Dockerfile b/tests/Dockerfile
deleted file mode 100644
index e69de29b..00000000
--- a/tests/Dockerfile
+++ /dev/null
diff --git a/tests/composer.json b/tests/composer.json
index 50178d1f..fa5925b7 100644
--- a/tests/composer.json
+++ b/tests/composer.json
@@ -2,7 +2,7 @@
"minimum-stability": "beta",
"prefer-stable": true,
"require": {
- "nyholm/psr7": "^1.3",
+ "nyholm/psr7": "^1.4",
"spiral/roadrunner": "^2.0",
"spiral/roadrunner-http": "^2.0",
"temporal/sdk": ">=1.0",
diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml
deleted file mode 100644
index b6ba0f66..00000000
--- a/tests/docker-compose.yaml
+++ /dev/null
@@ -1,15 +0,0 @@
-version: '3'
-
-services:
- memcached:
- image: memcached:latest
- ports:
- - "0.0.0.0:11211:11211"
- redis:
- image: redis:6
- ports:
- - "6379:6379"
- redis2:
- image: redis:6
- ports:
- - "6378:6379"
diff --git a/tests/env/Dockerfile-elastic-mq.yaml b/tests/env/Dockerfile-elastic-mq.yaml
new file mode 100644
index 00000000..c9f909d0
--- /dev/null
+++ b/tests/env/Dockerfile-elastic-mq.yaml
@@ -0,0 +1,9 @@
+FROM openjdk:16
+
+ADD https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-1.1.1.jar /
+COPY custom.conf /
+ENTRYPOINT ["java", "-Dconfig.file=custom.conf", "-jar", "/elasticmq-server-1.1.1.jar"]
+
+EXPOSE 9324
+
+CMD ["-help"]
diff --git a/tests/env/custom.conf b/tests/env/custom.conf
new file mode 100644
index 00000000..9be7730e
--- /dev/null
+++ b/tests/env/custom.conf
@@ -0,0 +1,8 @@
+include classpath("application.conf")
+
+node-address {
+ protocol = http
+ host = "*"
+ port = 9324
+ context-path = ""
+}
diff --git a/tests/env/docker-compose.yaml b/tests/env/docker-compose.yaml
new file mode 100644
index 00000000..a407fed4
--- /dev/null
+++ b/tests/env/docker-compose.yaml
@@ -0,0 +1,37 @@
+version: '3'
+
+services:
+ memcached:
+ image: memcached:latest
+ ports:
+ - "0.0.0.0:11211:11211"
+ redis:
+ image: redis:6
+ ports:
+ - "6379:6379"
+ redis2:
+ image: redis:6
+ ports:
+ - "6378:6379"
+
+ beanstalk:
+ image: schickling/beanstalkd
+ ports:
+ - "11300:11300"
+
+ sqs:
+ build:
+ context: .
+ dockerfile: Dockerfile-elastic-mq.yaml
+ ports:
+ - "9324:9324"
+
+ rabbitmq:
+ image: rabbitmq:3-management
+ environment:
+ RABBITMQ_DEFAULT_USER: guest
+ RABBITMQ_DEFAULT_PASS: guest
+ RABBITMQ_DEFAULT_VHOST: /
+ ports:
+ - "15672:15672"
+ - "5672:5672"
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
index 0ec813f3..d8bedf29 100644
--- a/tests/plugins/broadcast/broadcast_plugin_test.go
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -176,7 +176,7 @@ func TestBroadcastNoConfig(t *testing.T) {
}
func TestBroadcastSameSubscriber(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
cfg := &config.Viper{
@@ -189,11 +189,11 @@ func TestBroadcastSameSubscriber(t *testing.T) {
mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).MinTimes(1)
- mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6002", "services", []string{"broadcast"}).AnyTimes()
+ mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3)
- mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2)
mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3)
@@ -279,14 +279,15 @@ func TestBroadcastSameSubscriber(t *testing.T) {
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6002"))
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6002"))
- time.Sleep(time.Second * 4)
stopCh <- struct{}{}
wg.Wait()
+
+ time.Sleep(time.Second * 5)
}
func TestBroadcastSameSubscriberGlobal(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second))
assert.NoError(t, err)
cfg := &config.Viper{
@@ -299,11 +300,11 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
- mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).MinTimes(1)
- mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).MinTimes(1)
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6003", "services", []string{"broadcast"}).AnyTimes()
+ mockLogger.EXPECT().Debug("message published", "msg", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Info(`plugin1: {foo hello}`).Times(3)
- mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(3)
+ mockLogger.EXPECT().Info(`plugin1: {foo2 hello}`).Times(2)
mockLogger.EXPECT().Info(`plugin1: {foo3 hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin2: {foo hello}`).Times(3)
mockLogger.EXPECT().Info(`plugin3: {foo hello}`).Times(3)
@@ -389,10 +390,10 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
t.Run("PublishHelloFoo3", BroadcastPublishFoo3("6003"))
t.Run("PublishAsyncHelloFooFoo2Foo3", BroadcastPublishAsyncFooFoo2Foo3("6003"))
- time.Sleep(time.Second * 4)
stopCh <- struct{}{}
wg.Wait()
+ time.Sleep(time.Second * 5)
}
func BroadcastPublishFooFoo2Foo3(port string) func(t *testing.T) {
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml
index d8daa251..66114d64 100644
--- a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml
@@ -3,8 +3,6 @@ rpc:
server:
command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml
index 2ca97055..ea25988c 100644
--- a/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-global.yaml
@@ -3,8 +3,6 @@ rpc:
server:
command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
relay: "pipes"
relay_timeout: "20s"
@@ -38,9 +36,4 @@ broadcast:
logs:
mode: development
- level: error
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
+ level: info
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml
index 360e05e5..cbe18196 100644
--- a/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-same-section.yaml
@@ -3,8 +3,6 @@ rpc:
server:
command: "php ../../psr-worker-bench.php"
- user: ""
- group: ""
relay: "pipes"
relay_timeout: "20s"
@@ -35,9 +33,4 @@ broadcast:
logs:
mode: development
- level: debug
-
-endure:
- grace_period: 120s
- print_graph: false
- log_level: error
+ level: info
diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go
index d3b16256..01ad1479 100644
--- a/tests/plugins/broadcast/plugins/plugin1.go
+++ b/tests/plugins/broadcast/plugins/plugin1.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,14 @@ type Plugin1 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin1) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -39,16 +42,22 @@ func (p *Plugin1) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin1Name, *msg))
+ }
}
}()
@@ -59,6 +68,8 @@ func (p *Plugin1) Stop() error {
_ = p.driver.Unsubscribe("1", "foo")
_ = p.driver.Unsubscribe("1", "foo2")
_ = p.driver.Unsubscribe("1", "foo3")
+
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go
index 2bd819d2..ee072ffe 100644
--- a/tests/plugins/broadcast/plugins/plugin2.go
+++ b/tests/plugins/broadcast/plugins/plugin2.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,13 @@ type Plugin2 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+ exit chan struct{}
}
func (p *Plugin2) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +40,22 @@ func (p *Plugin2) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin2Name, *msg))
+ }
}
}()
@@ -56,6 +64,7 @@ func (p *Plugin2) Serve() chan error {
func (p *Plugin2) Stop() error {
_ = p.driver.Unsubscribe("2", "foo")
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go
index ef926222..288201d1 100644
--- a/tests/plugins/broadcast/plugins/plugin3.go
+++ b/tests/plugins/broadcast/plugins/plugin3.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,15 @@ type Plugin3 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin3) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin3) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin3Name, *msg))
+ }
}
}()
@@ -56,6 +66,7 @@ func (p *Plugin3) Serve() chan error {
func (p *Plugin3) Stop() error {
_ = p.driver.Unsubscribe("3", "foo")
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go
index c9b94777..56f79c0f 100644
--- a/tests/plugins/broadcast/plugins/plugin4.go
+++ b/tests/plugins/broadcast/plugins/plugin4.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,15 @@ type Plugin4 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin4) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin4) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin4Name, *msg))
+ }
}
}()
@@ -56,6 +66,8 @@ func (p *Plugin4) Serve() chan error {
func (p *Plugin4) Stop() error {
_ = p.driver.Unsubscribe("4", "foo")
+
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go
index 01562a8f..e7cd7e60 100644
--- a/tests/plugins/broadcast/plugins/plugin5.go
+++ b/tests/plugins/broadcast/plugins/plugin5.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,15 @@ type Plugin5 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin5) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin5) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin5Name, *msg))
+ }
}
}()
@@ -56,6 +66,8 @@ func (p *Plugin5) Serve() chan error {
func (p *Plugin5) Stop() error {
_ = p.driver.Unsubscribe("5", "foo")
+
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go
index 76f2d6e8..08272196 100644
--- a/tests/plugins/broadcast/plugins/plugin6.go
+++ b/tests/plugins/broadcast/plugins/plugin6.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -14,11 +14,15 @@ type Plugin6 struct {
log logger.Logger
b broadcast.Broadcaster
driver pubsub.SubReader
+
+ exit chan struct{}
}
func (p *Plugin6) Init(log logger.Logger, b broadcast.Broadcaster) error {
p.log = log
p.b = b
+
+ p.exit = make(chan struct{}, 1)
return nil
}
@@ -38,16 +42,22 @@ func (p *Plugin6) Serve() chan error {
go func() {
for {
- msg, err := p.driver.Next()
- if err != nil {
- panic(err)
- }
+ select {
+ case <-p.exit:
+ return
+ default:
+ msg, err := p.driver.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
- if msg == nil {
- continue
- }
+ if msg == nil {
+ continue
+ }
- p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
+ p.log.Info(fmt.Sprintf("%s: %s", Plugin6Name, *msg))
+ }
}
}()
@@ -56,6 +66,8 @@ func (p *Plugin6) Serve() chan error {
func (p *Plugin6) Stop() error {
_ = p.driver.Unsubscribe("6", "foo")
+
+ p.exit <- struct{}{}
return nil
}
diff --git a/tests/plugins/headers/configs/.rr-cors-headers.yaml b/tests/plugins/headers/configs/.rr-cors-headers.yaml
index 9d2ef7e5..b4e960f1 100644
--- a/tests/plugins/headers/configs/.rr-cors-headers.yaml
+++ b/tests/plugins/headers/configs/.rr-cors-headers.yaml
@@ -1,9 +1,5 @@
server:
command: "php ../../http/client.php headers pipes"
- user: ""
- group: ""
- env:
- "RR_HTTP": "true"
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/http/configs/.rr-env.yaml b/tests/plugins/http/configs/.rr-env.yaml
index 99358b04..4ea8ec73 100644
--- a/tests/plugins/http/configs/.rr-env.yaml
+++ b/tests/plugins/http/configs/.rr-env.yaml
@@ -3,17 +3,13 @@ rpc:
server:
command: "php ../../http/client.php env pipes"
- user: ""
- group: ""
- env:
- "env_key": "ENV_VALUE"
relay: "pipes"
relay_timeout: "20s"
http:
address: 127.0.0.1:12084
max_request_size: 1024
- middleware: [ "" ]
+ middleware: []
env:
"RR_HTTP": "true"
"env_key": "ENV_VALUE"
diff --git a/tests/plugins/http/configs/.rr-http.yaml b/tests/plugins/http/configs/.rr-http.yaml
index c95bc049..b4910160 100644
--- a/tests/plugins/http/configs/.rr-http.yaml
+++ b/tests/plugins/http/configs/.rr-http.yaml
@@ -3,10 +3,6 @@ rpc:
server:
command: "php ../../http/client.php echo pipes"
- user: ""
- group: ""
- env:
- "RR_HTTP": "true"
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index 40e3a720..37d9452c 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -26,7 +26,7 @@ func TestHandler_Echo(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -77,7 +77,7 @@ func TestHandler_Headers(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "header", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -138,7 +138,7 @@ func TestHandler_Empty_User_Agent(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -198,7 +198,7 @@ func TestHandler_User_Agent(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "user-agent", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -258,7 +258,7 @@ func TestHandler_Cookies(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "cookie", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -323,7 +323,7 @@ func TestHandler_JsonPayload_POST(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -387,7 +387,7 @@ func TestHandler_JsonPayload_PUT(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -447,7 +447,7 @@ func TestHandler_JsonPayload_PATCH(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "payload", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -507,7 +507,7 @@ func TestHandler_FormData_POST(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -580,7 +580,7 @@ func TestHandler_FormData_POST_Overwrite(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -653,7 +653,7 @@ func TestHandler_FormData_POST_Form_UrlEncoded_Charset(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -725,7 +725,7 @@ func TestHandler_FormData_PUT(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -797,7 +797,7 @@ func TestHandler_FormData_PATCH(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -869,7 +869,7 @@ func TestHandler_Multipart_POST(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -983,7 +983,7 @@ func TestHandler_Multipart_PUT(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1097,7 +1097,7 @@ func TestHandler_Multipart_PATCH(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "data", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1213,7 +1213,7 @@ func TestHandler_Error(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1259,7 +1259,7 @@ func TestHandler_Error2(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error2", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1305,7 +1305,7 @@ func TestHandler_Error3(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1364,7 +1364,7 @@ func TestHandler_ResponseDuration(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1425,7 +1425,7 @@ func TestHandler_ResponseDurationDelayed(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echoDelay", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1485,7 +1485,7 @@ func TestHandler_ErrorDuration(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "error", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1560,7 +1560,7 @@ func TestHandler_IP(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1621,7 +1621,7 @@ func TestHandler_XRealIP(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1687,7 +1687,7 @@ func TestHandler_XForwardedFor(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1752,7 +1752,7 @@ func TestHandler_XForwardedFor_NotTrustedRemoteIp(t *testing.T) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "ip", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -1800,7 +1800,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
p, err := pool.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- pool.Config{
+ &pool.Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
diff --git a/tests/plugins/http/uploads_test.go b/tests/plugins/http/uploads_test.go
index df696668..d02f9eee 100644
--- a/tests/plugins/http/uploads_test.go
+++ b/tests/plugins/http/uploads_test.go
@@ -31,7 +31,7 @@ func TestHandler_Upload_File(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -114,7 +114,7 @@ func TestHandler_Upload_NestedFile(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -197,7 +197,7 @@ func TestHandler_Upload_File_NoTmpDir(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
@@ -280,7 +280,7 @@ func TestHandler_Upload_File_Forbids(t *testing.T) {
pool, err := poolImpl.Initialize(context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "upload", "pipes") },
pipe.NewPipeFactory(),
- poolImpl.Config{
+ &poolImpl.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go
index 43335999..095140b8 100644
--- a/tests/plugins/informer/test_plugin.go
+++ b/tests/plugins/informer/test_plugin.go
@@ -10,7 +10,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
)
-var testPoolConfig = pool.Config{
+var testPoolConfig = &pool.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
@@ -51,13 +51,13 @@ func (p1 *Plugin1) Name() string {
func (p1 *Plugin1) Available() {}
-func (p1 *Plugin1) Workers() []process.State {
+func (p1 *Plugin1) Workers() []*process.State {
p, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil)
if err != nil {
panic(err)
}
- ps := make([]process.State, 0, len(p.Workers()))
+ ps := make([]*process.State, 0, len(p.Workers()))
workers := p.Workers()
for i := 0; i < len(workers); i++ {
state, err := process.WorkerProcessState(workers[i])
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init-no-amqp-global.yaml b/tests/plugins/jobs/configs/.rr-jobs-init-no-amqp-global.yaml
new file mode 100644
index 00000000..0d141b2b
--- /dev/null
+++ b/tests/plugins/jobs/configs/.rr-jobs-init-no-amqp-global.yaml
@@ -0,0 +1,75 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+logs:
+ level: error
+ mode: development
+
+jobs:
+ # num logical cores by default
+ num_pollers: 10
+ # 1mi by default
+ pipeline_size: 100000
+ # worker pool configuration
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-local:
+ driver: ephemeral
+ priority: 10
+ pipeline_size: 10000
+
+ test-local-2:
+ driver: ephemeral
+ priority: 1
+ pipeline_size: 10000
+
+ test-local-3:
+ driver: ephemeral
+ priority: 2
+ pipeline_size: 10000
+
+ test-1:
+ driver: amqp
+ priority: 1
+ pipeline_size: 1000000
+ queue: test-1-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test
+
+ test-2-amqp:
+ driver: amqp
+ priority: 2
+ pipeline_size: 100000
+ queue: test-2-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test-2
+
+ test-2:
+ driver: beanstalk
+ priority: 11
+ tube: default
+ pipeline_size: 1000000
+
+ test-3:
+ # priority: 11 - not defined, 10 by default
+ # driver locality not specified, local by default
+ driver: sqs
+ pipeline_size: 1000000
+ queue: default
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ]
+
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
new file mode 100644
index 00000000..63ddc70d
--- /dev/null
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -0,0 +1,93 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+ # beanstalk configuration
+beanstalk:
+ addr: tcp://localhost:11300
+
+ # amazon sqs configuration
+sqs:
+ key: api-key
+ secret: api-secret
+ region: us-west-1
+ endpoint: http://localhost:9324
+
+logs:
+ level: info
+ encoding: console
+ mode: development
+
+jobs:
+ # num logical cores by default
+ num_pollers: 10
+ # 1mi by default
+ pipeline_size: 100000
+ # worker pool configuration
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-local:
+ driver: ephemeral
+ priority: 10
+ pipeline_size: 10000
+
+ test-local-2:
+ driver: ephemeral
+ priority: 1
+ pipeline_size: 10000
+
+ test-local-3:
+ driver: ephemeral
+ priority: 2
+ pipeline_size: 10000
+
+ test-1:
+ driver: amqp
+ priority: 1
+ pipeline_size: 1000000
+ queue: test-1-queue
+ exchange: default
+ exclusive: true
+ exchange_type: direct
+ routing_key: test
+
+ test-2-amqp:
+ driver: amqp
+ priority: 2
+ pipeline_size: 100000
+ queue: test-2-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test-2
+
+ test-2:
+ driver: beanstalk
+ priority: 11
+ tube: default
+ pipeline_size: 1000000
+
+ test-3:
+ driver: sqs
+ pipeline_size: 1000000
+ queue: default
+ attributes:
+ MessageRetentionPeriod: 86400
+ tags:
+ test: "tag"
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3" ]
+
diff --git a/tests/plugins/jobs/configs/.rr-jobs-list.yaml b/tests/plugins/jobs/configs/.rr-jobs-list.yaml
new file mode 100644
index 00000000..3d22a098
--- /dev/null
+++ b/tests/plugins/jobs/configs/.rr-jobs-list.yaml
@@ -0,0 +1,91 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+ # beanstalk configuration
+beanstalk:
+ addr: tcp://localhost:11300
+
+ # amazon sqs configuration
+sqs:
+ key: api-key
+ secret: api-secret
+ region: us-west-1
+ endpoint: http://localhost:9324
+ declare:
+ MessageRetentionPeriod: 86400
+
+logs:
+ level: debug
+ mode: development
+
+jobs:
+ # num logical cores by default
+ num_pollers: 10
+ # 1mi by default
+ pipeline_size: 100000
+ # worker pool configuration
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-local:
+ driver: ephemeral
+ priority: 10
+ pipeline_size: 10000
+
+ test-local-2:
+ driver: ephemeral
+ priority: 1
+ pipeline_size: 10000
+
+ test-local-3:
+ driver: ephemeral
+ priority: 2
+ pipeline_size: 10000
+
+ test-1:
+ driver: amqp
+ priority: 1
+ pipeline_size: 1000000
+ queue: test-1-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test
+
+ test-2-amqp:
+ driver: amqp
+ priority: 2
+ pipeline_size: 100000
+ queue: test-2-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test-2
+
+ test-2:
+ driver: beanstalk
+ priority: 11
+ tube: default
+ pipeline_size: 1000000
+
+ test-3:
+ # priority: 11 - not defined, 10 by default
+ # driver locality not specified, local by default
+ driver: sqs
+ pipeline_size: 1000000
+ queue: default
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ]
+
diff --git a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml b/tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml
new file mode 100644
index 00000000..8789e872
--- /dev/null
+++ b/tests/plugins/jobs/configs/.rr-jobs-pause-resume-all.yaml
@@ -0,0 +1,78 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+logs:
+ level: debug
+ mode: development
+
+jobs:
+ # num logical cores by default
+ num_pollers: 10
+ # 1mi by default
+ pipeline_size: 100000
+ # worker pool configuration
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-local:
+ driver: ephemeral
+ priority: 10
+ pipeline_size: 10000
+
+ test-local-2:
+ driver: ephemeral
+ priority: 1
+ pipeline_size: 10000
+
+ test-local-3:
+ driver: ephemeral
+ priority: 2
+ pipeline_size: 10000
+
+ test-1:
+ driver: amqp
+ priority: 1
+ pipeline_size: 1000000
+ queue: test-1-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test
+
+ test-2-amqp:
+ driver: amqp
+ priority: 2
+ pipeline_size: 100000
+ queue: test-2-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test-2
+
+ test-2:
+ driver: beanstalk
+ priority: 11
+ tube: default
+ pipeline_size: 1000000
+
+ test-3:
+ # priority: 11 - not defined, 10 by default
+ # driver locality not specified, local by default
+ driver: sqs
+ pipeline_size: 1000000
+ queue: default
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ]
+
diff --git a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml b/tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml
new file mode 100644
index 00000000..8789e872
--- /dev/null
+++ b/tests/plugins/jobs/configs/.rr-jobs-pause-resume-amqp.yaml
@@ -0,0 +1,78 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+logs:
+ level: debug
+ mode: development
+
+jobs:
+ # num logical cores by default
+ num_pollers: 10
+ # 1mi by default
+ pipeline_size: 100000
+ # worker pool configuration
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-local:
+ driver: ephemeral
+ priority: 10
+ pipeline_size: 10000
+
+ test-local-2:
+ driver: ephemeral
+ priority: 1
+ pipeline_size: 10000
+
+ test-local-3:
+ driver: ephemeral
+ priority: 2
+ pipeline_size: 10000
+
+ test-1:
+ driver: amqp
+ priority: 1
+ pipeline_size: 1000000
+ queue: test-1-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test
+
+ test-2-amqp:
+ driver: amqp
+ priority: 2
+ pipeline_size: 100000
+ queue: test-2-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test-2
+
+ test-2:
+ driver: beanstalk
+ priority: 11
+ tube: default
+ pipeline_size: 1000000
+
+ test-3:
+ # priority: 11 - not defined, 10 by default
+ # driver locality not specified, local by default
+ driver: sqs
+ pipeline_size: 1000000
+ queue: default
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp" ]
+
diff --git a/tests/plugins/jobs/configs/.rr-jobs-pause-resume-ephemeral.yaml b/tests/plugins/jobs/configs/.rr-jobs-pause-resume-ephemeral.yaml
new file mode 100644
index 00000000..dc5bc3a1
--- /dev/null
+++ b/tests/plugins/jobs/configs/.rr-jobs-pause-resume-ephemeral.yaml
@@ -0,0 +1,44 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+logs:
+ level: info
+ mode: development
+
+jobs:
+ # num logical cores by default
+ num_pollers: 10
+ # 1mi by default
+ pipeline_size: 100000
+ # worker pool configuration
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-local:
+ driver: ephemeral
+ priority: 10
+ pipeline_size: 10000
+
+ test-local-2:
+ driver: ephemeral
+ priority: 1
+ pipeline_size: 10000
+
+ test-local-3:
+ driver: ephemeral
+ priority: 2
+ pipeline_size: 10000
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: [ "test-local", "test-local-2" ]
+
diff --git a/tests/plugins/jobs/configs/.rr-jobs-test.yaml b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
new file mode 100644
index 00000000..ee72c2b7
--- /dev/null
+++ b/tests/plugins/jobs/configs/.rr-jobs-test.yaml
@@ -0,0 +1,105 @@
+rpc:
+ listen: unix:///tmp/rr.sock
+
+server:
+ command: "php ../../client.php echo pipes"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+ # beanstalk configuration
+beanstalk:
+ addr: tcp://localhost:11300
+
+ # amazon sqs configuration
+sqs:
+ key: api-key
+ secret: api-secret
+ region: us-west-1
+ endpoint: http://localhost:9324
+ session_token: ""
+ ping_period: 10
+ attributes:
+ MessageRetentionPeriod: 86400
+
+logs:
+ level: info
+ encoding: console
+ mode: development
+
+jobs:
+ # num logical cores by default
+ num_pollers: 64
+ # 1mi by default
+ pipeline_size: 100000
+ # worker pool configuration
+ pool:
+ num_workers: 10
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ test-local:
+ driver: ephemeral
+ priority: 10
+ pipeline_size: 10000
+
+ test-local-2:
+ driver: ephemeral
+ priority: 1
+ pipeline_size: 10000
+
+ test-local-3:
+ driver: ephemeral
+ priority: 2
+ pipeline_size: 10000
+
+ test-1:
+ driver: amqp
+ priority: 1
+ pipeline_size: 1000000
+ queue: test-1-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test
+
+ test-4:
+ driver: amqp
+ priority: 1
+ pipeline_size: 1000000
+ queue: test-1-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test
+
+ test-2-amqp:
+ driver: amqp
+ priority: 2
+ pipeline_size: 100000
+ queue: test-2-queue
+ exchange: default
+ exchange_type: direct
+ routing_key: test-2
+
+ test-2:
+ driver: beanstalk
+ priority: 11
+ tube: default
+ pipeline_size: 1000000
+
+ test-3:
+ driver: sqs
+ pipeline_size: 1000000
+ queue: default
+ attributes:
+ MessageRetentionPeriod: 86400
+ tags:
+ test: "tag"
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: [ "test-local", "test-local-2", "test-local-3", "test-1", "test-2-amqp", "test-3" ]
+
diff --git a/tests/plugins/jobs/jobs_plugin_test.go b/tests/plugins/jobs/jobs_plugin_test.go
new file mode 100644
index 00000000..f8a5c3e7
--- /dev/null
+++ b/tests/plugins/jobs/jobs_plugin_test.go
@@ -0,0 +1,404 @@
+package jobs
+
+import (
+ "net"
+ "net/rpc"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/golang/mock/gomock"
+ endure "github.com/spiral/endure/pkg/container"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/informer"
+ "github.com/spiral/roadrunner/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/resetter"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
+ "github.com/spiral/roadrunner/v2/tests/mocks"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestTEMP_INTI(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-jobs-test.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &ephemeral.Plugin{},
+ &sqs.Plugin{},
+ &amqp.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func TestJobsInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-jobs-init.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("driver ready", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("driver ready", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Info("driver initialized", "driver", "amqp", "start", gomock.Any()).Times(2)
+
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2-amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(2)
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &ephemeral.Plugin{},
+ &amqp.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func TestJobsNoAMQPGlobal(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-jobs-init-no-amqp-global.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &ephemeral.Plugin{},
+ &amqp.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = cont.Serve()
+ require.Error(t, err)
+}
+
+func TestJobsPauseResume(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-jobs-pause-resume-ephemeral.yaml",
+ Prefix: "rr",
+ }
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ // general
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "services", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Info("pipeline started", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
+
+ mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-local", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &ephemeral.Plugin{},
+ )
+
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("ephemeralPause", ephemeralPause)
+ t.Run("pushToDisabledPipe", pushToDisabledPipe("test-local"))
+ t.Run("ephemeralResume", ephemeralResume)
+ t.Run("pushToEnabledPipe", pushToPipe("test-local"))
+
+ time.Sleep(time.Second * 1)
+
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func ephemeralPause(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ pipe := &jobsv1beta.MaintenanceRequest{Pipelines: make([]string, 1)}
+ pipe.GetPipelines()[0] = "test-local"
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call("jobs.Pause", pipe, er)
+ assert.NoError(t, err)
+}
+
+func ephemeralResume(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ pipe := &jobsv1beta.MaintenanceRequest{Pipelines: make([]string, 1)}
+ pipe.GetPipelines()[0] = "test-local"
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call("jobs.Resume", pipe, er)
+ assert.NoError(t, err)
+}
+
+func pushToDisabledPipe(pipeline string) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{
+ Job: "some/php/namespace",
+ Id: "1",
+ Payload: `{"hello":"world"}`,
+ Headers: nil,
+ Options: &jobsv1beta.Options{
+ Priority: 1,
+ Pipeline: pipeline,
+ },
+ }}
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call("jobs.Push", req, er)
+ assert.Error(t, err)
+ }
+}
+
+func pushToPipe(pipeline string) func(t *testing.T) {
+ return func(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{
+ Job: "some/php/namespace",
+ Id: "1",
+ Payload: `{"hello":"world"}`,
+ Headers: nil,
+ Options: &jobsv1beta.Options{
+ Priority: 1,
+ Pipeline: pipeline,
+ },
+ }}
+
+ er := &jobsv1beta.Empty{}
+ err = client.Call("jobs.Push", req, er)
+ assert.NoError(t, err)
+ }
+}
diff --git a/tests/plugins/resetter/test_plugin.go b/tests/plugins/resetter/test_plugin.go
index 61942516..5c26cbd0 100644
--- a/tests/plugins/resetter/test_plugin.go
+++ b/tests/plugins/resetter/test_plugin.go
@@ -9,7 +9,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
)
-var testPoolConfig = poolImpl.Config{
+var testPoolConfig = &poolImpl.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go
index f1c13734..e813e456 100644
--- a/tests/plugins/server/plugin_pipes.go
+++ b/tests/plugins/server/plugin_pipes.go
@@ -15,7 +15,7 @@ import (
const ConfigSection = "server"
const Response = "test"
-var testPoolConfig = pool.Config{
+var testPoolConfig = &pool.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php
index d0c72eae..b4a028d4 100644
--- a/tests/psr-worker-bench.php
+++ b/tests/psr-worker-bench.php
@@ -56,4 +56,4 @@ if ($env->getMode() === 'http') {
}
$factory->run();
-} \ No newline at end of file
+}
diff --git a/tests/worker-cors.php b/tests/worker-cors.php
new file mode 100644
index 00000000..ea3c986c
--- /dev/null
+++ b/tests/worker-cors.php
@@ -0,0 +1,15 @@
+<?php
+
+use Spiral\RoadRunner\Worker;
+use Spiral\RoadRunner\Http\HttpWorker;
+
+ini_set('display_errors', 'stderr');
+require __DIR__ . '/vendor/autoload.php';
+
+$http = new HttpWorker(Worker::create());
+
+while ($req = $http->waitRequest()) {
+ $http->respond(200, 'Response', [
+ 'Access-Control-Allow-Origin' => ['*']
+ ]);
+}
diff --git a/utils/pointers.go b/utils/pointers.go
new file mode 100644
index 00000000..9c192279
--- /dev/null
+++ b/utils/pointers.go
@@ -0,0 +1,15 @@
+package utils
+
+func AsUint64Ptr(val uint64) *uint64 {
+ if val == 0 {
+ val = 10
+ }
+ return &val
+}
+
+func AsStringPtr(val string) *string {
+ if val == "" {
+ return nil
+ }
+ return &val
+}