summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-22 11:44:22 +0300
committerValery Piashchynski <[email protected]>2021-06-22 11:44:22 +0300
commit1a2a1f4735e40675abf6cd9767c99374359ec2bb (patch)
tree5abedf7306b50b02ba3892c0bc562307a62eb332
parent260d69c21fba6d763d05dc5693689ddf7ce7bfe2 (diff)
- Remove all old code, reformat, fix linters, return GA
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--.dockerignore2
-rwxr-xr-x.github/ISSUE_TEMPLATE/bug_report.md (renamed from --github/ISSUE_TEMPLATE/bug_report.md)0
-rwxr-xr-x.github/ISSUE_TEMPLATE/feature_request.md (renamed from --github/ISSUE_TEMPLATE/feature_request.md)0
-rw-r--r--.github/dependabot.yml (renamed from --github/dependabot.yml)0
-rw-r--r--.github/pull_request_template.md (renamed from --github/pull_request_template.md)0
-rw-r--r--.github/workflows/codeql-analysis.yml (renamed from --github/workflows/codeql-analysis.yml)0
-rw-r--r--.github/workflows/linters.yml (renamed from --github/workflows/linters.yml)0
-rw-r--r--.github/workflows/linux.yml (renamed from --github/workflows/linux.yml)0
-rw-r--r--.github/workflows/windows.yml (renamed from --github/workflows/windows.yml)0
-rw-r--r--plugins/jobs/config.go14
-rw-r--r--plugins/jobs/dispatcher.go2
-rw-r--r--plugins/jobs/dispatcher_test.go3
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio2
-rw-r--r--plugins/jobs/interface.go3
-rw-r--r--plugins/jobs/job.go6
-rw-r--r--plugins/jobs/job_options_test.go3
-rw-r--r--plugins/jobs/job_test.go3
-rw-r--r--plugins/jobs/oooold/broker.go47
-rw-r--r--plugins/jobs/oooold/broker/amqp/broker.go216
-rw-r--r--plugins/jobs/oooold/broker/amqp/broker_test.go419
-rw-r--r--plugins/jobs/oooold/broker/amqp/conn.go232
-rw-r--r--plugins/jobs/oooold/broker/amqp/consume_test.go258
-rw-r--r--plugins/jobs/oooold/broker/amqp/durability_test.go728
-rw-r--r--plugins/jobs/oooold/broker/amqp/job.go56
-rw-r--r--plugins/jobs/oooold/broker/amqp/job_test.go29
-rw-r--r--plugins/jobs/oooold/broker/amqp/queue.go302
-rw-r--r--plugins/jobs/oooold/broker/amqp/stat_test.go63
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/broker.go185
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/broker_test.go276
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/config.go50
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/config_test.go47
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/conn.go180
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/constants.go6
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/consume_test.go242
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/durability_test.go575
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/job.go24
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/sock.bean0
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/stat_test.go66
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/tube.go250
-rw-r--r--plugins/jobs/oooold/broker/beanstalk/tube_test.go18
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/broker.go174
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/broker_test.go221
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/consume_test.go253
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/queue.go161
-rw-r--r--plugins/jobs/oooold/broker/ephemeral/stat_test.go64
-rw-r--r--plugins/jobs/oooold/broker/sqs/broker.go189
-rw-r--r--plugins/jobs/oooold/broker/sqs/broker_test.go275
-rw-r--r--plugins/jobs/oooold/broker/sqs/config.go82
-rw-r--r--plugins/jobs/oooold/broker/sqs/config_test.go48
-rw-r--r--plugins/jobs/oooold/broker/sqs/consume_test.go370
-rw-r--r--plugins/jobs/oooold/broker/sqs/durability_test.go588
-rw-r--r--plugins/jobs/oooold/broker/sqs/job.go80
-rw-r--r--plugins/jobs/oooold/broker/sqs/job_test.go19
-rw-r--r--plugins/jobs/oooold/broker/sqs/queue.go266
-rw-r--r--plugins/jobs/oooold/broker/sqs/stat_test.go60
-rw-r--r--plugins/jobs/oooold/broker_test.go314
-rw-r--r--plugins/jobs/oooold/config.go91
-rw-r--r--plugins/jobs/oooold/config_test.go158
-rw-r--r--plugins/jobs/oooold/dispatcher.go47
-rw-r--r--plugins/jobs/oooold/dispatcher_test.go53
-rw-r--r--plugins/jobs/oooold/event.go96
-rw-r--r--plugins/jobs/oooold/event_test.go52
-rw-r--r--plugins/jobs/oooold/job.go38
-rw-r--r--plugins/jobs/oooold/job_options.go70
-rw-r--r--plugins/jobs/oooold/job_options_test.go109
-rw-r--r--plugins/jobs/oooold/job_test.go18
-rw-r--r--plugins/jobs/oooold/pipeline.go169
-rw-r--r--plugins/jobs/oooold/pipeline_test.go89
-rw-r--r--plugins/jobs/oooold/rpc.go150
-rw-r--r--plugins/jobs/oooold/rpc_test.go657
-rw-r--r--plugins/jobs/oooold/service.go327
-rw-r--r--plugins/jobs/oooold/service_test.go458
-rw-r--r--plugins/jobs/pipeline_test.go3
-rw-r--r--plugins/jobs/plugin.go13
-rw-r--r--plugins/jobs/rpc.go1
-rw-r--r--proto/jobs/v1beta/jobs.proto22
-rw-r--r--proto/kv/v1beta/kv.pb.go5
-rw-r--r--proto/websockets/v1beta/websockets.pb.go5
78 files changed, 65 insertions, 10037 deletions
diff --git a/.dockerignore b/.dockerignore
index c28921c4..b817b3c8 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -2,7 +2,7 @@
.git
.gitignore
.editorconfig
---github
+.github
/src
/tests
/bin
diff --git a/--github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md
index 448a1aa4..448a1aa4 100755
--- a/--github/ISSUE_TEMPLATE/bug_report.md
+++ b/.github/ISSUE_TEMPLATE/bug_report.md
diff --git a/--github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md
index 470dd474..470dd474 100755
--- a/--github/ISSUE_TEMPLATE/feature_request.md
+++ b/.github/ISSUE_TEMPLATE/feature_request.md
diff --git a/--github/dependabot.yml b/.github/dependabot.yml
index 2c561205..2c561205 100644
--- a/--github/dependabot.yml
+++ b/.github/dependabot.yml
diff --git a/--github/pull_request_template.md b/.github/pull_request_template.md
index c3467850..c3467850 100644
--- a/--github/pull_request_template.md
+++ b/.github/pull_request_template.md
diff --git a/--github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml
index dc4a2349..dc4a2349 100644
--- a/--github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
diff --git a/--github/workflows/linters.yml b/.github/workflows/linters.yml
index cee7085c..cee7085c 100644
--- a/--github/workflows/linters.yml
+++ b/.github/workflows/linters.yml
diff --git a/--github/workflows/linux.yml b/.github/workflows/linux.yml
index 62987771..62987771 100644
--- a/--github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
diff --git a/--github/workflows/windows.yml b/.github/workflows/windows.yml
index f23f9b5d..f23f9b5d 100644
--- a/--github/workflows/windows.yml
+++ b/.github/workflows/windows.yml
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
index 5c5ad400..4606ccba 100644
--- a/plugins/jobs/config.go
+++ b/plugins/jobs/config.go
@@ -8,8 +8,8 @@ import (
// Config defines settings for job broker, workers and job-pipeline mapping.
type Config struct {
// Workers configures roadrunner server and worker busy.
- //Workers *roadrunner.ServerConfig
- pool poolImpl.Config
+ // Workers *roadrunner.ServerConfig
+ poolCfg poolImpl.Config
// Dispatch defines where and how to match jobs.
Dispatch map[string]*Options
@@ -25,6 +25,16 @@ type Config struct {
route Dispatcher
}
+func (c *Config) InitDefaults() error {
+ const op = errors.Op("config_init_defaults")
+ var err error
+ c.pipelines, err = initPipelines(c.Pipelines)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
// MatchPipeline locates the pipeline associated with the job.
func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error) {
const op = errors.Op("config_match_pipeline")
diff --git a/plugins/jobs/dispatcher.go b/plugins/jobs/dispatcher.go
index 9fde8fac..8faf4db5 100644
--- a/plugins/jobs/dispatcher.go
+++ b/plugins/jobs/dispatcher.go
@@ -18,7 +18,7 @@ func initDispatcher(routes map[string]*Options) Dispatcher {
pattern = strings.Trim(pattern, "-.*")
for _, s := range separators {
- pattern = strings.Replace(pattern, s, ".", -1)
+ pattern = strings.ReplaceAll(pattern, s, ".")
}
dispatcher[pattern] = opts
diff --git a/plugins/jobs/dispatcher_test.go b/plugins/jobs/dispatcher_test.go
index 59e3fd4e..9917642f 100644
--- a/plugins/jobs/dispatcher_test.go
+++ b/plugins/jobs/dispatcher_test.go
@@ -1,8 +1,9 @@
package jobs
import (
- "github.com/stretchr/testify/assert"
"testing"
+
+ "github.com/stretchr/testify/assert"
)
func Test_Map_All(t *testing.T) {
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
index ee923d29..a1c1532c 100644
--- a/plugins/jobs/doc/jobs_arch.drawio
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-06-21T13:57:33.772Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="eDaFKmf6xAQVYDDexY3m" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvrc5s4EP9rPONmJh7e4I9x7CTttVc3uZu2900GYWgBcUL4kb/+JBBPYcdNsJ32mkxra/VA+/jtrlZkoF6Hm1sMYu8DcmAwUCRnM1CnA0WRVVMz6ScjbXOSJUtSTlli3+HDKsKD/wg5sRiW+g5MGgMJQgHx4ybRRlEEbdKgAYzRujnMRUHzqTFYQoHwYINApH72HeIVu5MKPljPHfSXHhG6QlCM54TEAw5a10jqbKBeY4RI/i3cXMOASbAQTT7vZkdvuTcMI3LIhC9RtLmTUQhXjyQybz/BfyT7UtHyZVYgSDnTfLdkW0gBozRyIFtFGqiTtecT+BADm/WuqeYpzSNhQFsy/er6QXCNAoSzuaoDoOXalJ4QjL7DWo9hW3Dh0h6RD87aCmICNzUS5+sWUiYI3tIhvFe1uIy5oY3NwoLWldqUYpBXV1ihHcBNZVkuXomSfuHS/BHJCnKEDjUu3kSYeGiJIhDMKuqkKelqzHuEYi7fb5CQLUcKSAlqSp+KEG+/sPkjvWh+rfdNN3zxvLXlrZ06SFCKbbiPTc4nAXgJyZ6BfEEmhL0qxTAAxF81EdilHT51jny659IUNE1vmoJutjSc75RPq+NFWKllVPq4tVIuHGGlK4zBtjYsZgMSwaBKnp9vY+pZbGzjk5qJ0dbXwqbo98rAWGNbt7aT2qV8oFlqpzLLljGpxmFm2ZepFD618vP3c7qSdEn/zf9+uKOL5h90Q9vI7jcI6NBytK4gYCkL1TB6CgJmS8SaIgYBWesIAuqxYsD4rPiso7PE6o/hUzoaPrVDw4Z0GoAqbespc4pTAfS3M3+5sRgn8ubqeGSNDV2XTUWjB51x03SUdlJ5ZNMRc/gPgNge8+xv57P3b/+c9erOoUwdutnlzseGqYK+3Hk7p1e73LnS4c6PltKb50So3HDn5mvz5/qBEO0doS/SqPXT+Vy5oc5Ku2dTaO/nuhcpVBdc4UAxArr/SdzQs/Fvyuouk5Cy50cD9Yr2SvGG/p8JTcrpl4SplPVptT7qwMglCPwln2dT2ULc7HagjTDlF/ExzGRw4EewejT9tuSf2QYXBeEdWlCxXD0QnNokxbAYQAWyaE+itLhN83Cb8izeA+iSvNNine1t011mM6bZ2tiPliUj5fPnYBsg4Dw57mPMJJWU48p2F5MtwLKI4tsguMoVMs1UNuHqmeZMTBAd5QZZ9Y0FMxbTUEQ4QmWFt29A6AcMOXcwWEG26t446Orst7O2lf3wVWv0/Kef+GjI7fKE1BEfjY742C5j9AY+4xcAX2F6rx5/cz+GOU9PgGtKHe/2yVFXhMAwJhUGs1x1N5PHYOmeWWFzu7VdLASxnmpff/khRCl5zqZ+O6us1xJqqYc6K1k+lreSZUE9Z0vnD87mlVOVT6UD07/e66cv06kkhKB7ilw4yg7Ew29o8abXo7Br2dDuvN5aWLqm79XB4eiR9fZReHzgUfholU1ZEQSNIQ2WUT17awmaxeamNAvHx4O4Omn7x9B3nBx3MPEfwSJbikmV36nQdfXJQJ+ytSjUEu4jBYVEKIL96EIRyhKWqIvxKasSslj+iWluMFBYeR/9yrrQ2rhQDFEXXbe+x9OFeP6s5WmXNC8AIfMoPDug+UdMhcZzMn3qM9G7zPeYk4E5/b9orQNB8mkhdNbK3uuuvRc19acLQVbfqcCziu9yO+Yd+95GPPWGkD6RibuC/i4//BPW4eXzJx/nvVd93XC1DoRrobEzw1U68V1ZIZ8aXG8h2+6EYYgVklyMQiZAD2bML/2ERlbIipiLbAgrkax8wPE9yucNez5XuK7Sfa5wjIWhHwvaUlck7oJ2++WV/t6aEw9w8zTxhlVGm5/hJIpO+via3nqU/nleWrTGh0pfO5r0xbeEdoj318hEjbb9a6IG9FMmoop4rBZEj1LCkorr8p1nSazd0d8b9tjJEgPHh1UfF17L4MvhnZJ2QOKVSCq0+h4sYDCnWsrL7NMFIoT6TVHtzRJnaR119HVUN4tYzh4Jkjhn1PU3bB+5wUA8W8HcbjIb8UDMJoSbJXsbfQTWiTZKk+xZfZiKJDdNxVRFUzEt0VQKWv+mooqe8m5OCZ8/3v8xuxfMphBQjJENk+Rpp7gA9vdl5kY/5ha3I1SdpASmqy2ompo10g90lz1UGx8N11mr1o32Pbz6eptu5ub47lLUwOnuuzousvLSQiC2qrJCq/7Qau240SgvvajLSdIwS5KGeS705kx3Y52iEi5z9skxGyVI8YEAMrwoSjYszxhe0IQjGbGOPP+AGKMG3/1cQR2Loyx7uqiKUIyFi3d5DjUsLgOf4muHK8GZW677DPHqqsvJ7AB/h4vYUxIftxxC+QJnzR2UrwQ33MEz0ifarP5QJz+KVH/zpM7+Aw==</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-06-22T07:34:31.801Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="W3ry_giS-Ii3LUJqzQzg" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvpc9o4FP9rmKGZCePb5mMIOdpttzTZnbb7TdgCu7UtVxZX/vqVbPmUIDQxkLQlB+hJlvXe+71DT6anX0brGwwS/wPyYNjTFG/d08c9TdMUS6NvjLLJKaqmmjlljgOP0yrCffAAOVHh1EXgwbQxkCAUkiBpEl0Ux9AlDRrAGK2aw2YobN41AXMoEO5dEIrUz4FH/GJ19FX13MJg7hOhKwLFeE5IfeChVY2kX/X0S4wQyT9F60sYMgEWosmvu97SW64Nw5jsc8GXOF7fqiiCywcS2zef4H+Ke64Z+TRLEC4403y1ZFNIAaNF7EE2i9LTRys/IPA+AS7rXVHFU5pPopC2VPpxFoThJQoRzq7VPQCdmUvpKcHoO6z1WK4DpzPaI/LBWVtCTOC6RuJ83UDKBMEbOoT36g6XMQfa0C4QtKrUphWD/LrCCu0ADpV5OXklSvqBS/NnJCvIEXoUXLyJMPHRHMUgvKqoo6akqzHvEUq4fL9BQjbcUsCCoKb0qQjx5gu7fmAWza/1vvGaT563Nry1VQcpWmAX7mKT80kAnkOyYyCfkAlhp0oxDAEJlk0LlGmHXzpBAV1zCQXDMJtQMO2WhvOV8svq9iLM1AKVOWzNlAtHmOkCY7CpDUvYgFQAVMnz0zGmnwRj64DUIEZbXwtM0c8VwFhjU0fbUXGp7glL41iwbIFJt/aDZVdQKXxq5efvJnQm5Zz+Tf69v6WT5m90QZvY7TYImNDxDFkQcLSpblkdBQG7JWJDE4OAakiCgH6oGDA8qX3WrbO01Z+zT+Vg9mnsGzaU4xio1kZPmVMcy0D/OPPng8U6kjfXhwNnaJmmamuGqtvDJnS0dlJ5YOiIOfwHQFyfefa3k6v3b/++6tSdQ5U6dFvmzoeWrYOu3Hk7p9dl7lyTuPODpfT2KS1Ubbhz+6X5c3NPE+3cQp+lUefV+Vy1oc5KuydTaOf7umcp1BRcYU+zQrr+UdLQs/Vjweouo4iyF8Q9/YL2Ksma/s+EpuT0c8JUyvqMWh91YOQchMGcX+dS2ULc7PagizDlF/ExDDI4DGJY3Zp+mvP3bIHTgvAOTalYLu4JXrhkgWExgApk2r6I0pI2zcdtypN4D+GM5J0O62wvm64yu2KczY2DeF4yUt5/AjYhAt6j4z4mTFJpOa5sy5hsGSyLKIELwotcIeNMZSOunnHOxAjRUbMwq76xYMZiGooJt1BV4+1rEAUhs5xbGC4hm3VnHJyZ7Eda28pefNYaPX91Ex8ttV2eUCTx0ZLEx3YZozPjs34B4yug9+LtbxIkMOfpEeMaU8e7eXTUBSEwSkhlg1muup3JQ7B0x1DYXG5tFVNBrMda1z9BBNGCPGVRf5xV1usItdR9nZWqHspbqaqgnpOl83tn89qxyqfKnulf5/XT5+lUEULQHbVcOMg2xP1vaPqm063wzHGhKz3emjqmYe7Uwf7Wo5rtrfBwz63wwSqbqiYIGkMaLON69tYSNIvNTWkWjo8HcX3U9o9R4Hm53cE0eADTbComVX6mQuc1Rz1zzOaippZyHykoJEYx7EYXmlCWcERdDI9ZlVDF8k9Cc4Oexsr76FfWhdG2C80SdSE79T2cLsT9Zy1PO6d5AYiYR+HZAc0/Eio0npOZ44CJfsZ8jz3q2ePfRWsSC1KPa0Inrey97Np7UVN/vBDkdJ0KPKn4rrZj3qHPbcRdbwTpHZm4K9Pf5odfYR1ePX3ycdpz1Zdtrs6e5lpo7MTmqhz5rKyQT81cbyBb7ojZECskzTCKmAB9mDE/D1IaWSErYk6zIaxEsgwAt+9Bfl2/433FbKbJ9xWeNbXMQ5m2IovEMtNuP7zS3VNz4gZuskj9fpXR5ns4hVonvX1Nbx1K/zQPLTrDfaVvHEz64lNCW8T7a2SiVhv/hqgB85iJqCZuqwXRowVhScVl+cyzItbu6M81u+1ojoEXwKqPC68F+HK4VNIeSP3SkgqtvgdTGE6olvIy+3iKCKF+U1R7s8RZoqNufZLqZhHL2S1BmuSMzoI1W0cOGIivljDHTYYRHyTsgmg9Zw+jD8AqNQaLNLtXF1BR1CZUbF2Eiu2IUClo3UNFFz3l7YQSPn+8++vqToBNIaAEIxem6eNOcQrc7/PMjX7MEbclVB2lBGbqLVO1DWdg7ukuO6g2Plgzb6U718b36OLrzWI9sYe356IGjnfeJTnIyksLodiqygqt+kOrteVEozz0oi4nXURZktTPc6E3Jzobk4pKOMzZJcdslCDFewJI/6wo2bA8o39GE450wDry/ANijBp8d3MEdSiOsuzprCpCMRbO3uU5VL84DHyFfHEsNpT1SpZ+x/cUTaD19Gv+SxVDvWhIYx7baqQoXMJ0C3dbfDzO4mXdmYtnijLvv8UrS3z39rOK0gXvOqwon9Vu+Oku8lqpo/5N6gWtrfyjBQSprCTPfUnHOXIYdH7Ot2uR9dwn+/LCczaBXYDfFr7mIdvTKRLwd1Es266nhqh+DFhIY7mED+geInwBYmtnd9oBa4y0WX3pMi8rVV9f1a/+Bw==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
index 060326c8..d013d320 100644
--- a/plugins/jobs/interface.go
+++ b/plugins/jobs/interface.go
@@ -1,8 +1,9 @@
package jobs
-
// todo naming
type Consumer interface {
Push()
Stat()
+ Consume(*Pipeline)
+ Register(*Pipeline)
}
diff --git a/plugins/jobs/job.go b/plugins/jobs/job.go
index 8458b25b..79bb8ad8 100644
--- a/plugins/jobs/job.go
+++ b/plugins/jobs/job.go
@@ -5,12 +5,6 @@ import (
"github.com/spiral/roadrunner/v2/utils"
)
-//// Handler handles job execution.
-//type Handler func(id string, j *Job) error
-//
-//// ErrorHandler handles job execution errors.
-//type ErrorHandler func(id string, j *Job, err error)
-
// Job carries information about single job.
type Job struct {
// Job contains name of job broker (usually PHP class).
diff --git a/plugins/jobs/job_options_test.go b/plugins/jobs/job_options_test.go
index 8caaa935..d226fa1e 100644
--- a/plugins/jobs/job_options_test.go
+++ b/plugins/jobs/job_options_test.go
@@ -1,9 +1,10 @@
package jobs
import (
- "github.com/stretchr/testify/assert"
"testing"
"time"
+
+ "github.com/stretchr/testify/assert"
)
func TestOptions_CanRetry(t *testing.T) {
diff --git a/plugins/jobs/job_test.go b/plugins/jobs/job_test.go
index e1938eca..1f4bf918 100644
--- a/plugins/jobs/job_test.go
+++ b/plugins/jobs/job_test.go
@@ -1,8 +1,9 @@
package jobs
import (
- "github.com/stretchr/testify/assert"
"testing"
+
+ "github.com/stretchr/testify/assert"
)
func TestJob_Body(t *testing.T) {
diff --git a/plugins/jobs/oooold/broker.go b/plugins/jobs/oooold/broker.go
deleted file mode 100644
index d49616e7..00000000
--- a/plugins/jobs/oooold/broker.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package oooold
-
-// Broker manages set of pipelines and provides ability to push jobs into them.
-type Broker interface {
- // Register broker pipeline.
- Register(pipe *Pipeline) error
-
- // Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before
- // the service is started!
- Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error
-
- // Push job into the worker.
- Push(pipe *Pipeline, j *Job) (string, error)
-
- // Stat must fetch statistics about given pipeline or return error.
- Stat(pipe *Pipeline) (stat *Stat, err error)
-}
-
-// EventProvider defines the ability to throw events for the broker.
-type EventProvider interface {
- // Listen attaches the even listener.
- Listen(lsn func(event int, ctx interface{}))
-}
-
-// Stat contains information about pipeline.
-type Stat struct {
- // Pipeline name.
- Pipeline string
-
- // Broken is name of associated broker.
- Broker string
-
- // InternalName defines internal broker specific pipeline name.
- InternalName string
-
- // Consuming indicates that pipeline is pipelines jobs.
- Consuming bool
-
- // testQueue defines number of pending jobs.
- Queue int64
-
- // Active defines number of jobs which are currently being processed.
- Active int64
-
- // Delayed defines number of jobs which are being processed.
- Delayed int64
-}
diff --git a/plugins/jobs/oooold/broker/amqp/broker.go b/plugins/jobs/oooold/broker/amqp/broker.go
deleted file mode 100644
index b47d83ee..00000000
--- a/plugins/jobs/oooold/broker/amqp/broker.go
+++ /dev/null
@@ -1,216 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/gofrs/uuid"
- "github.com/spiral/jobs/v2"
- "sync"
- "sync/atomic"
-)
-
-// Broker represents AMQP broker.
-type Broker struct {
- cfg *Config
- lsn func(event int, ctx interface{})
- publish *chanPool
- consume *chanPool
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*jobs.Pipeline]*queue
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures AMQP job broker (always 2 connections).
-func (b *Broker) Init(cfg *Config) (ok bool, err error) {
- b.cfg = cfg
- b.queues = make(map[*jobs.Pipeline]*queue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
- }
-
- q, err := newQueue(pipe, b.throw)
- if err != nil {
- return err
- }
-
- b.queues[pipe] = q
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() (err error) {
- b.mu.Lock()
-
- if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
- b.mu.Unlock()
- return err
- }
- defer b.publish.Close()
-
- if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
- b.mu.Unlock()
- return err
- }
- defer b.consume.Close()
-
- for _, q := range b.queues {
- err := q.declare(b.publish, q.name, q.key, nil)
- if err != nil {
- b.mu.Unlock()
- return err
- }
- }
-
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve(b.publish, b.consume)
- }
- }
-
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- for _, q := range b.queues {
- q.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.publish != nil && q.execPool != nil {
- if q.execPool != nil {
- go q.serve(b.publish, b.consume)
- }
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- id, err := uuid.NewV4()
- if err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- if err := q.publish(b.publish, id.String(), 0, j, j.Options.DelayDuration()); err != nil {
- return "", err
- }
-
- return id.String(), nil
-}
-
-// Stat must fetch statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- queue, err := q.inspect(b.publish)
- if err != nil {
- return nil, err
- }
-
- // this the closest approximation we can get for now
- return &jobs.Stat{
- InternalName: queue.Name,
- Queue: int64(queue.Messages),
- Active: int64(atomic.LoadInt32(&q.running)),
- }, nil
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/amqp/broker_test.go b/plugins/jobs/oooold/broker/amqp/broker_test.go
deleted file mode 100644
index 66078099..00000000
--- a/plugins/jobs/oooold/broker/amqp/broker_test.go
+++ /dev/null
@@ -1,419 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "amqp",
- "name": "default",
- "queue": "rr-queue",
- "exchange": "rr-exchange",
- "prefetch": 1,
- }
-
- cfg = &Config{
- Addr: "amqp://guest:guest@localhost:5672/",
- }
-)
-
-var (
- fanoutPipe = &jobs.Pipeline{
- "broker": "amqp",
- "name": "fanout",
- "queue": "fanout-queue",
- "exchange": "fanout-exchange",
- "exchange-type": "fanout",
- "prefetch": 1,
- }
-
- fanoutCfg = &Config{
- Addr: "amqp://guest:guest@localhost:5672/",
- }
-)
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_BadPipeline(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.Error(t, b.Register(&jobs.Pipeline{
- "broker": "amqp",
- "name": "default",
- "exchange": "rr-exchange",
- "prefetch": 1,
- }))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_CantStart(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(&Config{
- Addr: "amqp://guest:guest@localhost:15672/",
- })
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Serve())
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- err = b.Consume(pipe, exec, errf)
- if err != nil {
- t.Fatal()
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_Queue_RoutingKey(t *testing.T) {
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- assert.Equal(t, pipeWithKey.String("routing-key", ""), "rr-exchange-routing-key")
-}
-
-func TestBroker_Register_With_RoutingKey(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- assert.NoError(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Consume_With_RoutingKey(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("routing-key", "rr-exchange-routing-key")
-
- err = b.Register(&pipeWithKey)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(&pipeWithKey, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Queue_ExchangeType(t *testing.T) {
- pipeWithKey := pipe.With("exchange-type", "direct")
-
- assert.Equal(t, pipeWithKey.String("exchange-type", ""), "direct")
-}
-
-func TestBroker_Register_With_ExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("exchange-type", "fanout")
-
- assert.NoError(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Register_With_WrongExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := pipe.With("exchange-type", "xxx")
-
- assert.Error(t, b.Register(&pipeWithKey))
-}
-
-func TestBroker_Consume_With_ExchangeType(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(fanoutCfg)
- if err != nil {
- t.Fatal(err)
- }
-
- pipeWithKey := fanoutPipe.With("exchange-type", "fanout")
-
- err = b.Register(&pipeWithKey)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(&pipeWithKey, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(&pipeWithKey, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
diff --git a/plugins/jobs/oooold/broker/amqp/conn.go b/plugins/jobs/oooold/broker/amqp/conn.go
deleted file mode 100644
index be747776..00000000
--- a/plugins/jobs/oooold/broker/amqp/conn.go
+++ /dev/null
@@ -1,232 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/cenkalti/backoff/v4"
- "github.com/streadway/amqp"
- "sync"
- "time"
-)
-
-// manages set of AMQP channels
-type chanPool struct {
- // timeout to backoff redial
- tout time.Duration
- url string
-
- mu *sync.Mutex
-
- conn *amqp.Connection
- channels map[string]*channel
- wait chan interface{}
- connected chan interface{}
-}
-
-// manages single channel
-type channel struct {
- ch *amqp.Channel
- // todo unused
- //consumer string
- confirm chan amqp.Confirmation
- signal chan error
-}
-
-// newConn creates new watched AMQP connection
-func newConn(url string, tout time.Duration) (*chanPool, error) {
- conn, err := dial(url)
- if err != nil {
- return nil, err
- }
-
- cp := &chanPool{
- url: url,
- tout: tout,
- conn: conn,
- mu: &sync.Mutex{},
- channels: make(map[string]*channel),
- wait: make(chan interface{}),
- connected: make(chan interface{}),
- }
-
- close(cp.connected)
- go cp.watch()
- return cp, nil
-}
-
-// dial dials to AMQP.
-func dial(url string) (*amqp.Connection, error) {
- return amqp.Dial(url)
-}
-
-// Close gracefully closes all underlying channels and connection.
-func (cp *chanPool) Close() error {
- cp.mu.Lock()
-
- close(cp.wait)
- if cp.channels == nil {
- return fmt.Errorf("connection is dead")
- }
-
- // close all channels and consume
- var wg sync.WaitGroup
- for _, ch := range cp.channels {
- wg.Add(1)
-
- go func(ch *channel) {
- defer wg.Done()
- cp.closeChan(ch, nil)
- }(ch)
- }
- cp.mu.Unlock()
-
- wg.Wait()
-
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- if cp.conn != nil {
- return cp.conn.Close()
- }
-
- return nil
-}
-
-// waitConnected waits till connection is connected again or eventually closed.
-// must only be invoked after connection error has been delivered to channel.signal.
-func (cp *chanPool) waitConnected() chan interface{} {
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- return cp.connected
-}
-
-// watch manages connection state and reconnects if needed
-func (cp *chanPool) watch() {
- for {
- select {
- case <-cp.wait:
- // connection has been closed
- return
- // here we are waiting for the errors from amqp connection
- case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)):
- cp.mu.Lock()
- // clear connected, since connections are dead
- cp.connected = make(chan interface{})
-
- // broadcast error to all consume to let them for the tryReconnect
- for _, ch := range cp.channels {
- ch.signal <- err
- }
-
- // disable channel allocation while server is dead
- cp.conn = nil
- cp.channels = nil
-
- // initialize the backoff
- expb := backoff.NewExponentialBackOff()
- expb.MaxInterval = cp.tout
- cp.mu.Unlock()
-
- // reconnect function
- reconnect := func() error {
- cp.mu.Lock()
- conn, err := dial(cp.url)
- if err != nil {
- // still failing
- fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error()))
- cp.mu.Unlock()
- return err
- }
-
- // TODO ADD LOGGING
- fmt.Println("------amqp successfully redialed------")
-
- // here we are reconnected
- // replace the connection
- cp.conn = conn
- // re-init the channels
- cp.channels = make(map[string]*channel)
- cp.mu.Unlock()
- return nil
- }
-
- // start backoff retry
- errb := backoff.Retry(reconnect, expb)
- if errb != nil {
- fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error()))
- // reconnection failed
- close(cp.connected)
- return
- }
- close(cp.connected)
- }
- }
-}
-
-// channel allocates new channel on amqp connection
-func (cp *chanPool) channel(name string) (*channel, error) {
- cp.mu.Lock()
- dead := cp.conn == nil
- cp.mu.Unlock()
-
- if dead {
- // wait for connection restoration (doubled the timeout duration)
- select {
- case <-time.NewTimer(cp.tout * 2).C:
- return nil, fmt.Errorf("connection is dead")
- case <-cp.connected:
- // connected
- }
- }
-
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- if cp.conn == nil {
- return nil, fmt.Errorf("connection has been closed")
- }
-
- if ch, ok := cp.channels[name]; ok {
- return ch, nil
- }
-
- // we must create new channel
- ch, err := cp.conn.Channel()
- if err != nil {
- return nil, err
- }
-
- // Enable publish confirmations
- if err = ch.Confirm(false); err != nil {
- return nil, fmt.Errorf("unable to enable confirmation mode on channel: %s", err)
- }
-
- // we expect that every allocated channel would have listener on signal
- // this is not true only in case of pure producing channels
- cp.channels[name] = &channel{
- ch: ch,
- confirm: ch.NotifyPublish(make(chan amqp.Confirmation, 1)),
- signal: make(chan error, 1),
- }
-
- return cp.channels[name], nil
-}
-
-// closeChan gracefully closes and removes channel allocation.
-func (cp *chanPool) closeChan(c *channel, err error) error {
- cp.mu.Lock()
- defer cp.mu.Unlock()
-
- go func() {
- c.signal <- nil
- c.ch.Close()
- }()
-
- for name, ch := range cp.channels {
- if ch == c {
- delete(cp.channels, name)
- }
- }
-
- return err
-}
diff --git a/plugins/jobs/oooold/broker/amqp/consume_test.go b/plugins/jobs/oooold/broker/amqp/consume_test.go
deleted file mode 100644
index 28999c36..00000000
--- a/plugins/jobs/oooold/broker/amqp/consume_test.go
+++ /dev/null
@@ -1,258 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- start := time.Now()
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed >= time.Second)
- assert.True(t, elapsed < 3*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/amqp/durability_test.go b/plugins/jobs/oooold/broker/amqp/durability_test.go
deleted file mode 100644
index 00d62c51..00000000
--- a/plugins/jobs/oooold/broker/amqp/durability_test.go
+++ /dev/null
@@ -1,728 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "io"
- "net"
- "sync"
- "testing"
- "time"
-)
-
-var (
- proxyCfg = &Config{
- Addr: "amqp://guest:guest@localhost:5673/",
- Timeout: 1,
- }
-
- proxy = &tcpProxy{
- listen: "localhost:5673",
- upstream: "localhost:5672",
- accept: true,
- }
-)
-
-type tcpProxy struct {
- listen string
- upstream string
- mu sync.Mutex
- accept bool
- conn []net.Conn
-}
-
-func (p *tcpProxy) serve() {
- l, err := net.Listen("tcp", p.listen)
- if err != nil {
- panic(err)
- }
-
- for {
- in, err := l.Accept()
- if err != nil {
- panic(err)
- }
-
- if !p.accepting() {
- in.Close()
- }
-
- up, err := net.Dial("tcp", p.upstream)
- if err != nil {
- panic(err)
- }
-
- go io.Copy(in, up)
- go io.Copy(up, in)
-
- p.mu.Lock()
- p.conn = append(p.conn, in, up)
- p.mu.Unlock()
- }
-}
-
-// wait for specific number of connections
-func (p *tcpProxy) waitConn(count int) *tcpProxy {
- p.mu.Lock()
- p.accept = true
- p.mu.Unlock()
-
- for {
- p.mu.Lock()
- current := len(p.conn)
- p.mu.Unlock()
-
- if current >= count*2 {
- break
- }
-
- time.Sleep(time.Millisecond)
- }
-
- return p
-}
-
-func (p *tcpProxy) reset(accept bool) int {
- p.mu.Lock()
- p.accept = accept
- defer p.mu.Unlock()
-
- count := 0
- for _, conn := range p.conn {
- conn.Close()
- count++
- }
-
- p.conn = nil
- return count / 2
-}
-
-func (p *tcpProxy) accepting() bool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.accept
-}
-
-func init() {
- go proxy.serve()
-}
-
-func TestBroker_Durability_Base(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- // expect 2 connections
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Durability_Consume(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- time.Sleep(3 * time.Second)
- proxy.waitConn(1)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NotEqual(t, "0", jid)
-
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
- if perr != nil {
- panic(perr)
- }
-
- proxy.reset(true)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2_2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // start when connection is dead
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
- if perr != nil {
- panic(perr)
- }
-
- proxy.reset(false)
-
- _, serr := b.Stat(pipe)
- assert.Error(t, serr)
-
- proxy.reset(true)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume3(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
- if perr != nil {
- panic(perr)
- }
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume4(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- ch, err := b.consume.channel("purger")
- if err != nil {
- panic(err)
- }
- _, err = ch.ch.QueuePurge("rr-queue", false)
- if err != nil {
- panic(err)
- }
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- proxy.waitConn(2)
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "kill",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
-
- if j.Payload == "kill" && len(done) == 0 {
- proxy.reset(true)
- }
-
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 3 {
- break
- }
- }
-}
-
-func TestBroker_Durability_StopDead(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- b.Stop()
-}
diff --git a/plugins/jobs/oooold/broker/amqp/job.go b/plugins/jobs/oooold/broker/amqp/job.go
deleted file mode 100644
index bd559715..00000000
--- a/plugins/jobs/oooold/broker/amqp/job.go
+++ /dev/null
@@ -1,56 +0,0 @@
-package amqp
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/streadway/amqp"
-)
-
-// pack job metadata into headers
-func pack(id string, attempt int, j *jobs.Job) amqp.Table {
- return amqp.Table{
- "rr-id": id,
- "rr-job": j.Job,
- "rr-attempt": int64(attempt),
- "rr-maxAttempts": int64(j.Options.Attempts),
- "rr-timeout": int64(j.Options.Timeout),
- "rr-delay": int64(j.Options.Delay),
- "rr-retryDelay": int64(j.Options.RetryDelay),
- }
-}
-
-// unpack restores jobs.Options
-func unpack(d amqp.Delivery) (id string, attempt int, j *jobs.Job, err error) {
- j = &jobs.Job{Payload: string(d.Body), Options: &jobs.Options{}}
-
- if _, ok := d.Headers["rr-id"].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-id")
- }
-
- if _, ok := d.Headers["rr-attempt"].(int64); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-attempt")
- }
-
- if _, ok := d.Headers["rr-job"].(string); !ok {
- return "", 0, nil, fmt.Errorf("missing header `%s`", "rr-job")
- }
- j.Job = d.Headers["rr-job"].(string)
-
- if _, ok := d.Headers["rr-maxAttempts"].(int64); ok {
- j.Options.Attempts = int(d.Headers["rr-maxAttempts"].(int64))
- }
-
- if _, ok := d.Headers["rr-timeout"].(int64); ok {
- j.Options.Timeout = int(d.Headers["rr-timeout"].(int64))
- }
-
- if _, ok := d.Headers["rr-delay"].(int64); ok {
- j.Options.Delay = int(d.Headers["rr-delay"].(int64))
- }
-
- if _, ok := d.Headers["rr-retryDelay"].(int64); ok {
- j.Options.RetryDelay = int(d.Headers["rr-retryDelay"].(int64))
- }
-
- return d.Headers["rr-id"].(string), int(d.Headers["rr-attempt"].(int64)), j, nil
-}
diff --git a/plugins/jobs/oooold/broker/amqp/job_test.go b/plugins/jobs/oooold/broker/amqp/job_test.go
deleted file mode 100644
index 24ca453b..00000000
--- a/plugins/jobs/oooold/broker/amqp/job_test.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package amqp
-
-import (
- "github.com/streadway/amqp"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func Test_Unpack_Errors(t *testing.T) {
- _, _, _, err := unpack(amqp.Delivery{
- Headers: map[string]interface{}{},
- })
- assert.Error(t, err)
-
- _, _, _, err = unpack(amqp.Delivery{
- Headers: map[string]interface{}{
- "rr-id": "id",
- },
- })
- assert.Error(t, err)
-
- _, _, _, err = unpack(amqp.Delivery{
- Headers: map[string]interface{}{
- "rr-id": "id",
- "rr-attempt": int64(0),
- },
- })
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/amqp/queue.go b/plugins/jobs/oooold/broker/amqp/queue.go
deleted file mode 100644
index 6ef5f20f..00000000
--- a/plugins/jobs/oooold/broker/amqp/queue.go
+++ /dev/null
@@ -1,302 +0,0 @@
-package amqp
-
-import (
- "errors"
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/streadway/amqp"
- "os"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type ExchangeType string
-
-const (
- Direct ExchangeType = "direct"
- Fanout ExchangeType = "fanout"
- Topic ExchangeType = "topic"
- Headers ExchangeType = "headers"
-)
-
-func (et ExchangeType) IsValid() error {
- switch et {
- case Direct, Fanout, Topic, Headers:
- return nil
- }
- return errors.New("unknown exchange-type")
-}
-
-func (et ExchangeType) String() string {
- switch et {
- case Direct, Fanout, Topic, Headers:
- return string(et)
- default:
- return "direct"
- }
-}
-
-
-type queue struct {
- active int32
- pipe *jobs.Pipeline
- exchange string
- exchangeType ExchangeType
- name, key string
- consumer string
-
- // active consuming channel
- muc sync.Mutex
- cc *channel
-
- // queue events
- lsn func(event int, ctx interface{})
-
- // active operations
- muw sync.RWMutex
- wg sync.WaitGroup
-
- // exec handlers
- running int32
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-// newQueue creates new queue wrapper for AMQP.
-func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) {
- if pipe.String("queue", "") == "" {
- return nil, fmt.Errorf("missing `queue` parameter on amqp pipeline")
- }
-
- exchangeType := ExchangeType(pipe.String("exchange-type", "direct"))
-
- err := exchangeType.IsValid()
- if err != nil {
- return nil, fmt.Errorf(err.Error())
- }
-
- return &queue{
- exchange: pipe.String("exchange", "amqp.direct"),
- exchangeType: exchangeType,
- name: pipe.String("queue", ""),
- key: pipe.String("routing-key", pipe.String("queue", "")),
- consumer: pipe.String("consumer", fmt.Sprintf("rr-jobs:%s-%v", pipe.Name(), os.Getpid())),
- pipe: pipe,
- lsn: lsn,
- }, nil
-}
-
-// serve consumes queue
-func (q *queue) serve(publish, consume *chanPool) {
- atomic.StoreInt32(&q.active, 1)
-
- for {
- <-consume.waitConnected()
- if atomic.LoadInt32(&q.active) == 0 {
- // stopped
- return
- }
-
- delivery, cc, err := q.consume(consume)
- if err != nil {
- q.report(err)
- continue
- }
-
- q.muc.Lock()
- q.cc = cc
- q.muc.Unlock()
-
- for d := range delivery {
- q.muw.Lock()
- q.wg.Add(1)
- q.muw.Unlock()
-
- atomic.AddInt32(&q.running, 1)
- h := <-q.execPool
-
- go func(h jobs.Handler, d amqp.Delivery) {
- err := q.do(publish, h, d)
-
- atomic.AddInt32(&q.running, ^int32(0))
- q.execPool <- h
- q.wg.Done()
- q.report(err)
- }(h, d)
- }
- }
-}
-
-func (q *queue) consume(consume *chanPool) (jobs <-chan amqp.Delivery, cc *channel, err error) {
- // allocate channel for the consuming
- if cc, err = consume.channel(q.name); err != nil {
- return nil, nil, err
- }
-
- if err := cc.ch.Qos(q.pipe.Integer("prefetch", 4), 0, false); err != nil {
- return nil, nil, consume.closeChan(cc, err)
- }
-
- delivery, err := cc.ch.Consume(q.name, q.consumer, false, false, false, false, nil)
- if err != nil {
- return nil, nil, consume.closeChan(cc, err)
- }
-
- // do i like it?
- go func(consume *chanPool) {
- for err := range cc.signal {
- consume.closeChan(cc, err)
- return
- }
- }(consume)
-
- return delivery, cc, err
-}
-
-func (q *queue) do(cp *chanPool, h jobs.Handler, d amqp.Delivery) error {
- id, attempt, j, err := unpack(d)
- if err != nil {
- q.report(err)
- return d.Nack(false, false)
- }
- err = h(id, j)
-
- if err == nil {
- return d.Ack(false)
- }
-
- // failed
- q.errHandler(id, j, err)
-
- if !j.Options.CanRetry(attempt) {
- return d.Nack(false, false)
- }
-
- // retry as new j (to accommodate attempt number and new delay)
- if err = q.publish(cp, id, attempt+1, j, j.Options.RetryDuration()); err != nil {
- q.report(err)
- return d.Nack(false, true)
- }
-
- return d.Ack(false)
-}
-
-func (q *queue) stop() {
- if atomic.LoadInt32(&q.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&q.active, 0)
-
- q.muc.Lock()
- if q.cc != nil {
- // gracefully stopped consuming
- q.report(q.cc.ch.Cancel(q.consumer, true))
- }
- q.muc.Unlock()
-
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-}
-
-// publish message to queue or to delayed queue.
-func (q *queue) publish(cp *chanPool, id string, attempt int, j *jobs.Job, delay time.Duration) error {
- c, err := cp.channel(q.name)
- if err != nil {
- return err
- }
-
- qKey := q.key
-
- if delay != 0 {
- delayMs := int64(delay.Seconds() * 1000)
- qName := fmt.Sprintf("delayed-%d.%s.%s", delayMs, q.exchange, q.name)
- qKey = qName
-
- err := q.declare(cp, qName, qName, amqp.Table{
- "x-dead-letter-exchange": q.exchange,
- "x-dead-letter-routing-key": q.name,
- "x-message-ttl": delayMs,
- "x-expires": delayMs * 2,
- })
-
- if err != nil {
- return err
- }
- }
-
- err = c.ch.Publish(
- q.exchange, // exchange
- qKey, // routing key
- false, // mandatory
- false, // immediate
- amqp.Publishing{
- ContentType: "application/octet-stream",
- Body: j.Body(),
- DeliveryMode: amqp.Persistent,
- Headers: pack(id, attempt, j),
- },
- )
-
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- confirmed, ok := <-c.confirm
- if ok && confirmed.Ack {
- return nil
- }
-
- return fmt.Errorf("failed to publish: %v", confirmed.DeliveryTag)
-}
-
-// declare queue and binding to it
-func (q *queue) declare(cp *chanPool, queue string, key string, args amqp.Table) error {
- c, err := cp.channel(q.name)
- if err != nil {
- return err
- }
-
- err = c.ch.ExchangeDeclare(q.exchange, q.exchangeType.String(), true, false, false, false, nil)
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- _, err = c.ch.QueueDeclare(queue, true, false, false, false, args)
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- err = c.ch.QueueBind(queue, key, q.exchange, false, nil)
- if err != nil {
- return cp.closeChan(c, err)
- }
-
- // keep channel open
- return err
-}
-
-// inspect the queue
-func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) {
- c, err := cp.channel("stat")
- if err != nil {
- return nil, err
- }
-
- queue, err := c.ch.QueueInspect(q.name)
- if err != nil {
- return nil, cp.closeChan(c, err)
- }
-
- // keep channel open
- return &queue, err
-}
-
-// throw handles service, server and pool events.
-func (q *queue) report(err error) {
- if err != nil {
- q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
- }
-}
diff --git a/plugins/jobs/oooold/broker/amqp/stat_test.go b/plugins/jobs/oooold/broker/amqp/stat_test.go
deleted file mode 100644
index ef19746c..00000000
--- a/plugins/jobs/oooold/broker/amqp/stat_test.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package amqp
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "sync"
- "testing"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- wg := &sync.WaitGroup{}
- wg.Add(1)
- exec <- func(id string, j *jobs.Job) error {
- defer wg.Done()
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Active)
-
- return nil
- }
-
- wg.Wait()
- stat, err = b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/broker.go b/plugins/jobs/oooold/broker/beanstalk/broker.go
deleted file mode 100644
index dc3ea518..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/broker.go
+++ /dev/null
@@ -1,185 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "sync"
-)
-
-// Broker run consume using Broker service.
-type Broker struct {
- cfg *Config
- lsn func(event int, ctx interface{})
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- conn *conn
- tubes map[*jobs.Pipeline]*tube
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures broker.
-func (b *Broker) Init(cfg *Config) (bool, error) {
- b.cfg = cfg
- b.tubes = make(map[*jobs.Pipeline]*tube)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.tubes[pipe]; ok {
- return fmt.Errorf("tube `%s` has already been registered", pipe.Name())
- }
-
- t, err := newTube(pipe, b.throw)
- if err != nil {
- return err
- }
-
- b.tubes[pipe] = t
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() (err error) {
- b.mu.Lock()
-
- if b.conn, err = b.cfg.newConn(); err != nil {
- return err
- }
- defer b.conn.Close()
-
- for _, t := range b.tubes {
- tt := t
- if tt.execPool != nil {
- go tt.serve(b.cfg)
- }
- }
-
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- for _, t := range b.tubes {
- t.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to reset consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- t, ok := b.tubes[pipe]
- if !ok {
- return fmt.Errorf("undefined tube `%s`", pipe.Name())
- }
-
- t.stop()
-
- t.execPool = execPool
- t.errHandler = errHandler
-
- if b.conn != nil {
- tt := t
- if tt.execPool != nil {
- go tt.serve(connFactory(b.cfg))
- }
- }
-
- return nil
-}
-
-// Push data into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- t := b.tube(pipe)
- if t == nil {
- return "", fmt.Errorf("undefined tube `%s`", pipe.Name())
- }
-
- data, err := pack(j)
- if err != nil {
- return "", err
- }
-
- return t.put(b.conn, 0, data, j.Options.DelayDuration(), j.Options.TimeoutDuration())
-}
-
-// Stat must fetch statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- t := b.tube(pipe)
- if t == nil {
- return nil, fmt.Errorf("undefined tube `%s`", pipe.Name())
- }
-
- return t.stat(b.conn)
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) tube(pipe *jobs.Pipeline) *tube {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- t, ok := b.tubes[pipe]
- if !ok {
- return nil
- }
-
- return t
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/broker_test.go b/plugins/jobs/oooold/broker/beanstalk/broker_test.go
deleted file mode 100644
index cd2132af..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/broker_test.go
+++ /dev/null
@@ -1,276 +0,0 @@
-package beanstalk
-
-import (
- "github.com/beanstalkd/go-beanstalk"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "beanstalk",
- "name": "default",
- "tube": "test",
- }
-
- cfg = &Config{
- Addr: "tcp://localhost:11300",
- }
-)
-
-func init() {
- conn, err := beanstalk.Dial("tcp", "localhost:11300")
- if err != nil {
- panic(err)
- }
- defer conn.Close()
-
- t := beanstalk.Tube{Name: "testTube", Conn: conn}
-
- for {
- id, _, err := t.PeekReady()
- if id == 0 || err != nil {
- break
- }
-
- if err := conn.Delete(id); err != nil {
- panic(err)
- }
- }
-}
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Invalid(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.Error(t, b.Register(&jobs.Pipeline{
- "broker": "beanstalk",
- "name": "default",
- }))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_Error(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(&Config{
- Addr: "tcp://localhost:11399",
- })
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Serve())
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- err = b.Consume(pipe, exec, errf)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/config.go b/plugins/jobs/oooold/broker/beanstalk/config.go
deleted file mode 100644
index 3e48a2d7..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/config.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/spiral/roadrunner/service"
- "strings"
- "time"
-)
-
-// Config defines beanstalk broker configuration.
-type Config struct {
- // Addr of beanstalk server.
- Addr string
-
- // Timeout to allocate the connection. Default 10 seconds.
- Timeout int
-}
-
-// Hydrate config values.
-func (c *Config) Hydrate(cfg service.Config) error {
- if err := cfg.Unmarshal(c); err != nil {
- return err
- }
-
- if c.Addr == "" {
- return fmt.Errorf("beanstalk address is missing")
- }
-
- return nil
-}
-
-// TimeoutDuration returns number of seconds allowed to allocate the connection.
-func (c *Config) TimeoutDuration() time.Duration {
- timeout := c.Timeout
- if timeout == 0 {
- timeout = 10
- }
-
- return time.Duration(timeout) * time.Second
-}
-
-// size creates new rpc socket Listener.
-func (c *Config) newConn() (*conn, error) {
- dsn := strings.Split(c.Addr, "://")
- if len(dsn) != 2 {
- return nil, fmt.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock)")
- }
-
- return newConn(dsn[0], dsn[1], c.TimeoutDuration())
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/config_test.go b/plugins/jobs/oooold/broker/beanstalk/config_test.go
deleted file mode 100644
index 4ba08a04..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/config_test.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package beanstalk
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type mockCfg struct{ cfg string }
-
-func (cfg *mockCfg) Get(name string) service.Config { return nil }
-func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func TestConfig_Hydrate_Error(t *testing.T) {
- cfg := &mockCfg{`{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func TestConfig_Hydrate_Error2(t *testing.T) {
- cfg := &mockCfg{`{"addr":""}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func TestConfig_Hydrate_Error3(t *testing.T) {
- cfg := &mockCfg{`{"addr":"tcp"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- _, err := c.newConn()
- assert.Error(t, err)
-}
-
-func TestConfig_Hydrate_Error4(t *testing.T) {
- cfg := &mockCfg{`{"addr":"unix://sock.bean"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- _, err := c.newConn()
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/conn.go b/plugins/jobs/oooold/broker/beanstalk/conn.go
deleted file mode 100644
index 7aba6bbb..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/conn.go
+++ /dev/null
@@ -1,180 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/beanstalkd/go-beanstalk"
- "github.com/cenkalti/backoff/v4"
- "strings"
- "sync"
- "time"
-)
-
-var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"}
-
-// creates new connections
-type connFactory interface {
- newConn() (*conn, error)
-}
-
-// conn protects allocation for one connection between
-// threads and provides reconnecting capabilities.
-type conn struct {
- tout time.Duration
- conn *beanstalk.Conn
- alive bool
- free chan interface{}
- dead chan interface{}
- stop chan interface{}
- lock *sync.Cond
-}
-
-// creates new beanstalk connection and reconnect watcher.
-func newConn(network, addr string, tout time.Duration) (cn *conn, err error) {
- cn = &conn{
- tout: tout,
- alive: true,
- free: make(chan interface{}, 1),
- dead: make(chan interface{}, 1),
- stop: make(chan interface{}),
- lock: sync.NewCond(&sync.Mutex{}),
- }
-
- cn.conn, err = beanstalk.Dial(network, addr)
- if err != nil {
- return nil, err
- }
-
- go cn.watch(network, addr)
-
- return cn, nil
-}
-
-// reset the connection and reconnect watcher.
-func (cn *conn) Close() error {
- cn.lock.L.Lock()
- defer cn.lock.L.Unlock()
-
- close(cn.stop)
- for cn.alive {
- cn.lock.Wait()
- }
-
- return nil
-}
-
-// acquire connection instance or return error in case of timeout. When mandratory set to true
-// timeout won't be applied.
-func (cn *conn) acquire(mandatory bool) (*beanstalk.Conn, error) {
- // do not apply timeout on mandatory connections
- if mandatory {
- select {
- case <-cn.stop:
- return nil, fmt.Errorf("connection closed")
- case <-cn.free:
- return cn.conn, nil
- }
- }
-
- select {
- case <-cn.stop:
- return nil, fmt.Errorf("connection closed")
- case <-cn.free:
- return cn.conn, nil
- default:
- // *2 to handle commands called right after the connection reset
- tout := time.NewTimer(cn.tout * 2)
- select {
- case <-cn.stop:
- tout.Stop()
- return nil, fmt.Errorf("connection closed")
- case <-cn.free:
- tout.Stop()
- return cn.conn, nil
- case <-tout.C:
- return nil, fmt.Errorf("unable to allocate connection (timeout %s)", cn.tout)
- }
- }
-}
-
-// release acquired connection.
-func (cn *conn) release(err error) error {
- if isConnError(err) {
- // reconnect is required
- cn.dead <- err
- } else {
- cn.free <- nil
- }
-
- return err
-}
-
-// watch and reconnect if dead
-func (cn *conn) watch(network, addr string) {
- cn.free <- nil
- t := time.NewTicker(WatchThrottleLimit)
- defer t.Stop()
- for {
- select {
- case <-cn.dead:
- // simple throttle limiter
- <-t.C
- // try to reconnect
- // TODO add logging here
- expb := backoff.NewExponentialBackOff()
- expb.MaxInterval = cn.tout
-
- reconnect := func() error {
- conn, err := beanstalk.Dial(network, addr)
- if err != nil {
- fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error()))
- return err
- }
-
- // TODO ADD LOGGING
- fmt.Println("------beanstalk successfully redialed------")
-
- cn.conn = conn
- cn.free <- nil
- return nil
- }
-
- err := backoff.Retry(reconnect, expb)
- if err != nil {
- fmt.Println(fmt.Sprintf("redial failed: %s", err.Error()))
- cn.dead <- nil
- }
-
- case <-cn.stop:
- cn.lock.L.Lock()
- select {
- case <-cn.dead:
- case <-cn.free:
- }
-
- // stop underlying connection
- cn.conn.Close()
- cn.alive = false
- cn.lock.Signal()
-
- cn.lock.L.Unlock()
-
- return
- }
- }
-}
-
-// isConnError indicates that error is related to dead socket.
-func isConnError(err error) bool {
- if err == nil {
- return false
- }
-
- for _, errStr := range connErrors {
- // golang...
- if strings.Contains(err.Error(), errStr) {
- return true
- }
- }
-
- return false
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/constants.go b/plugins/jobs/oooold/broker/beanstalk/constants.go
deleted file mode 100644
index 84be305e..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/constants.go
+++ /dev/null
@@ -1,6 +0,0 @@
-package beanstalk
-
-import "time"
-
-// WatchThrottleLimit is used to limit reconnection occurrence in watch function
-const WatchThrottleLimit = time.Second \ No newline at end of file
diff --git a/plugins/jobs/oooold/broker/beanstalk/consume_test.go b/plugins/jobs/oooold/broker/beanstalk/consume_test.go
deleted file mode 100644
index b16866ae..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/consume_test.go
+++ /dev/null
@@ -1,242 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- start := time.Now()
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed >= time.Second)
- assert.True(t, elapsed < 2*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/durability_test.go b/plugins/jobs/oooold/broker/beanstalk/durability_test.go
deleted file mode 100644
index 499a5206..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/durability_test.go
+++ /dev/null
@@ -1,575 +0,0 @@
-package beanstalk
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "io"
- "net"
- "sync"
- "testing"
- "time"
-)
-
-var (
- proxyCfg = &Config{
- Addr: "tcp://localhost:11301",
- Timeout: 1,
- }
-
- proxy = &tcpProxy{
- listen: "localhost:11301",
- upstream: "localhost:11300",
- accept: true,
- }
-)
-
-type tcpProxy struct {
- listen string
- upstream string
- mu sync.Mutex
- accept bool
- conn []net.Conn
-}
-
-func (p *tcpProxy) serve() {
- l, err := net.Listen("tcp", p.listen)
- if err != nil {
- panic(err)
- }
-
- for {
- in, err := l.Accept()
- if err != nil {
- panic(err)
- }
-
- if !p.accepting() {
- in.Close()
- }
-
- up, err := net.Dial("tcp", p.upstream)
- if err != nil {
- panic(err)
- }
-
- go io.Copy(in, up)
- go io.Copy(up, in)
-
- p.mu.Lock()
- p.conn = append(p.conn, in, up)
- p.mu.Unlock()
- }
-}
-
-// wait for specific number of connections
-func (p *tcpProxy) waitConn(count int) *tcpProxy {
- p.mu.Lock()
- p.accept = true
- p.mu.Unlock()
-
- for {
- p.mu.Lock()
- current := len(p.conn)
- p.mu.Unlock()
-
- if current >= count*2 {
- break
- }
-
- time.Sleep(time.Millisecond)
- }
-
- return p
-}
-
-func (p *tcpProxy) reset(accept bool) int {
- p.mu.Lock()
- p.accept = accept
- defer p.mu.Unlock()
-
- count := 0
- for _, conn := range p.conn {
- conn.Close()
- count++
- }
-
- p.conn = nil
- return count / 2
-}
-
-func (p *tcpProxy) accepting() bool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.accept
-}
-
-func init() {
- go proxy.serve()
-}
-
-func TestBroker_Durability_Base(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- // expect 2 connections
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Durability_Consume(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // reoccuring
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- time.Sleep(3 * time.Second)
- proxy.waitConn(1)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NotEqual(t, "0", jid)
-
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(pipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- proxy.reset(true)
-
- // auto-reconnect
- _, serr = b.Stat(pipe)
- assert.NoError(t, serr)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_Consume3(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2)
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(pipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_Consume4(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(2)
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "kill",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- st, serr := b.Stat(pipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(3), st.Queue+st.Active)
-
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- done[id] = true
- if j.Payload == "kill" {
- proxy.reset(true)
- }
-
- return nil
- }
-
- for {
- st, err := b.Stat(pipe)
- if err != nil {
- continue
- }
-
- // wait till pipeline is empty
- if st.Queue+st.Active == 0 {
- return
- }
- }
-}
-
-func TestBroker_Durability_StopDead(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- proxy.waitConn(2).reset(false)
-
- b.Stop()
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/job.go b/plugins/jobs/oooold/broker/beanstalk/job.go
deleted file mode 100644
index fd9c8c3c..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/job.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package beanstalk
-
-import (
- "bytes"
- "encoding/gob"
- "github.com/spiral/jobs/v2"
-)
-
-func pack(j *jobs.Job) ([]byte, error) {
- b := new(bytes.Buffer)
- err := gob.NewEncoder(b).Encode(j)
- if err != nil {
- return nil, err
- }
-
- return b.Bytes(), nil
-}
-
-func unpack(data []byte) (*jobs.Job, error) {
- j := &jobs.Job{}
- err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(j)
-
- return j, err
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/sock.bean b/plugins/jobs/oooold/broker/beanstalk/sock.bean
deleted file mode 100644
index e69de29b..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/sock.bean
+++ /dev/null
diff --git a/plugins/jobs/oooold/broker/beanstalk/stat_test.go b/plugins/jobs/oooold/broker/beanstalk/stat_test.go
deleted file mode 100644
index 14a55859..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/stat_test.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package beanstalk
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- b.Register(pipe)
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- // beanstalk reserves job right after push
- time.Sleep(time.Millisecond * 100)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, int64(1), stat.Active)
-
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- stat, err = b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/tube.go b/plugins/jobs/oooold/broker/beanstalk/tube.go
deleted file mode 100644
index 9d7ad117..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/tube.go
+++ /dev/null
@@ -1,250 +0,0 @@
-package beanstalk
-
-import (
- "fmt"
- "github.com/beanstalkd/go-beanstalk"
- "github.com/spiral/jobs/v2"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type tube struct {
- active int32
- pipe *jobs.Pipeline
- mut sync.Mutex
- tube *beanstalk.Tube
- tubeSet *beanstalk.TubeSet
- reserve time.Duration
-
- // tube events
- lsn func(event int, ctx interface{})
-
- // stop channel
- wait chan interface{}
-
- // active operations
- muw sync.RWMutex
- wg sync.WaitGroup
-
- // exec handlers
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-type entry struct {
- id uint64
- data []byte
-}
-
-func (e *entry) String() string {
- return fmt.Sprintf("%v", e.id)
-}
-
-// create new tube consumer and producer
-func newTube(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*tube, error) {
- if pipe.String("tube", "") == "" {
- return nil, fmt.Errorf("missing `tube` parameter on beanstalk pipeline")
- }
-
- return &tube{
- pipe: pipe,
- tube: &beanstalk.Tube{Name: pipe.String("tube", "")},
- tubeSet: beanstalk.NewTubeSet(nil, pipe.String("tube", "")),
- reserve: pipe.Duration("reserve", time.Second),
- lsn: lsn,
- }, nil
-}
-
-// run consumers
-func (t *tube) serve(connector connFactory) {
- // tube specific consume connection
- cn, err := connector.newConn()
- if err != nil {
- t.report(err)
- return
- }
- defer cn.Close()
-
- t.wait = make(chan interface{})
- atomic.StoreInt32(&t.active, 1)
-
- for {
- e, err := t.consume(cn)
- if err != nil {
- if isConnError(err) {
- t.report(err)
- }
- continue
- }
-
- if e == nil {
- return
- }
-
- h := <-t.execPool
- go func(h jobs.Handler, e *entry) {
- err := t.do(cn, h, e)
- t.execPool <- h
- t.wg.Done()
- t.report(err)
- }(h, e)
- }
-}
-
-// fetch consume
-func (t *tube) consume(cn *conn) (*entry, error) {
- t.muw.Lock()
- defer t.muw.Unlock()
-
- select {
- case <-t.wait:
- return nil, nil
- default:
- conn, err := cn.acquire(false)
- if err != nil {
- return nil, err
- }
-
- t.tubeSet.Conn = conn
-
- id, data, err := t.tubeSet.Reserve(t.reserve)
- cn.release(err)
-
- if err != nil {
- return nil, err
- }
-
- t.wg.Add(1)
- return &entry{id: id, data: data}, nil
- }
-}
-
-// do data
-func (t *tube) do(cn *conn, h jobs.Handler, e *entry) error {
- j, err := unpack(e.data)
- if err != nil {
- return err
- }
-
- err = h(e.String(), j)
-
- // mandatory acquisition
- conn, connErr := cn.acquire(true)
- if connErr != nil {
- // possible if server is dead
- return connErr
- }
-
- if err == nil {
- return cn.release(conn.Delete(e.id))
- }
-
- stat, statErr := conn.StatsJob(e.id)
- if statErr != nil {
- return cn.release(statErr)
- }
-
- t.errHandler(e.String(), j, err)
-
- reserves, ok := strconv.Atoi(stat["reserves"])
- if ok != nil || !j.Options.CanRetry(reserves-1) {
- return cn.release(conn.Bury(e.id, 0))
- }
-
- return cn.release(conn.Release(e.id, 0, j.Options.RetryDuration()))
-}
-
-// stop tube consuming
-func (t *tube) stop() {
- if atomic.LoadInt32(&t.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&t.active, 0)
-
- close(t.wait)
-
- t.muw.Lock()
- t.wg.Wait()
- t.muw.Unlock()
-}
-
-// put data into pool or return error (no wait), this method will try to reattempt operation if
-// dead conn found.
-func (t *tube) put(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
- id, err = t.doPut(cn, attempt, data, delay, rrt)
- if err != nil && isConnError(err) {
- return t.doPut(cn, attempt, data, delay, rrt)
- }
-
- return id, err
-}
-
-// perform put operation
-func (t *tube) doPut(cn *conn, attempt int, data []byte, delay, rrt time.Duration) (id string, err error) {
- conn, err := cn.acquire(false)
- if err != nil {
- return "", err
- }
-
- var bid uint64
-
- t.mut.Lock()
- t.tube.Conn = conn
- bid, err = t.tube.Put(data, 0, delay, rrt)
- t.mut.Unlock()
-
- return strconv.FormatUint(bid, 10), cn.release(err)
-}
-
-// return tube stats (retries)
-func (t *tube) stat(cn *conn) (stat *jobs.Stat, err error) {
- stat, err = t.doStat(cn)
- if err != nil && isConnError(err) {
- return t.doStat(cn)
- }
-
- return stat, err
-}
-
-// return tube stats
-func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) {
- conn, err := cn.acquire(false)
- if err != nil {
- return nil, err
- }
-
- t.mut.Lock()
- t.tube.Conn = conn
- values, err := t.tube.Stats()
- t.mut.Unlock()
-
- if err != nil {
- return nil, cn.release(err)
- }
-
- stat = &jobs.Stat{InternalName: t.tube.Name}
-
- if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil {
- stat.Queue = int64(v)
- }
-
- if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil {
- stat.Active = int64(v)
- }
-
- if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil {
- stat.Delayed = int64(v)
- }
-
- return stat, cn.release(nil)
-}
-
-// report tube specific error
-func (t *tube) report(err error) {
- if err != nil {
- t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err})
- }
-}
diff --git a/plugins/jobs/oooold/broker/beanstalk/tube_test.go b/plugins/jobs/oooold/broker/beanstalk/tube_test.go
deleted file mode 100644
index b6a646f4..00000000
--- a/plugins/jobs/oooold/broker/beanstalk/tube_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package beanstalk
-
-import (
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestTube_CantServe(t *testing.T) {
- var gctx interface{}
- tube := &tube{
- lsn: func(event int, ctx interface{}) {
- gctx = ctx
- },
- }
-
- tube.serve(&Config{Addr: "broken"})
- assert.Error(t, gctx.(error))
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/broker.go b/plugins/jobs/oooold/broker/ephemeral/broker.go
deleted file mode 100644
index 385bb175..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/broker.go
+++ /dev/null
@@ -1,174 +0,0 @@
-package ephemeral
-
-import (
- "fmt"
- "github.com/gofrs/uuid"
- "github.com/spiral/jobs/v2"
- "sync"
-)
-
-// Broker run queue using local goroutines.
-type Broker struct {
- lsn func(event int, ctx interface{})
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*jobs.Pipeline]*queue
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures broker.
-func (b *Broker) Init() (bool, error) {
- b.queues = make(map[*jobs.Pipeline]*queue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
- }
-
- b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0))
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() error {
- // start consuming
- b.mu.Lock()
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve()
- }
- }
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- // stop all consuming
- for _, q := range b.queues {
- q.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.wait != nil {
- if q.execPool != nil {
- go q.serve()
- }
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- id, err := uuid.NewV4()
- if err != nil {
- return "", err
- }
-
- q.push(id.String(), j, 0, j.Options.DelayDuration())
-
- return id.String(), nil
-}
-
-// Stat must consume statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- return q.stat(), nil
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/broker_test.go b/plugins/jobs/oooold/broker/ephemeral/broker_test.go
deleted file mode 100644
index c1b40276..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/broker_test.go
+++ /dev/null
@@ -1,221 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "local",
- "name": "default",
- }
-)
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init()
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- b.Init()
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- err = b.Consume(pipe, exec, errf)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/consume_test.go b/plugins/jobs/oooold/broker/ephemeral/consume_test.go
deleted file mode 100644
index d764a984..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/consume_test.go
+++ /dev/null
@@ -1,253 +0,0 @@
-package ephemeral
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- start := time.Now()
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed > time.Second)
- assert.True(t, elapsed < 2*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/queue.go b/plugins/jobs/oooold/broker/ephemeral/queue.go
deleted file mode 100644
index a24bc216..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/queue.go
+++ /dev/null
@@ -1,161 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/jobs/v2"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type queue struct {
- on int32
- state *jobs.Stat
-
- // job pipeline
- concurPool chan interface{}
- jobs chan *entry
-
- // on operations
- muw sync.Mutex
- wg sync.WaitGroup
-
- // stop channel
- wait chan interface{}
-
- // exec handlers
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-type entry struct {
- id string
- job *jobs.Job
- attempt int
-}
-
-// create new queue
-func newQueue(maxConcur int) *queue {
- q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)}
-
- if maxConcur != 0 {
- q.concurPool = make(chan interface{}, maxConcur)
- for i := 0; i < maxConcur; i++ {
- q.concurPool <- nil
- }
- }
-
- return q
-}
-
-// serve consumers
-func (q *queue) serve() {
- q.wait = make(chan interface{})
- atomic.StoreInt32(&q.on, 1)
-
- for {
- e := q.consume()
- if e == nil {
- q.wg.Wait()
- return
- }
-
- if q.concurPool != nil {
- <-q.concurPool
- }
-
- atomic.AddInt64(&q.state.Active, 1)
- h := <-q.execPool
-
- go func(h jobs.Handler, e *entry) {
- defer q.wg.Done()
-
- q.do(h, e)
- atomic.AddInt64(&q.state.Active, ^int64(0))
-
- q.execPool <- h
-
- if q.concurPool != nil {
- q.concurPool <- nil
- }
- }(h, e)
- }
-}
-
-// allocate one job entry
-func (q *queue) consume() *entry {
- q.muw.Lock()
- defer q.muw.Unlock()
-
- select {
- case <-q.wait:
- return nil
- case e := <-q.jobs:
- q.wg.Add(1)
-
- return e
- }
-}
-
-// do singe job
-func (q *queue) do(h jobs.Handler, e *entry) {
- err := h(e.id, e.job)
-
- if err == nil {
- atomic.AddInt64(&q.state.Queue, ^int64(0))
- return
- }
-
- q.errHandler(e.id, e.job, err)
-
- if !e.job.Options.CanRetry(e.attempt) {
- atomic.AddInt64(&q.state.Queue, ^int64(0))
- return
- }
-
- q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration())
-}
-
-// stop the queue consuming
-func (q *queue) stop() {
- if atomic.LoadInt32(&q.on) == 0 {
- return
- }
-
- close(q.wait)
-
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-
- atomic.StoreInt32(&q.on, 0)
-}
-
-// add job to the queue
-func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
- if delay == 0 {
- atomic.AddInt64(&q.state.Queue, 1)
- go func() {
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-
- return
- }
-
- atomic.AddInt64(&q.state.Delayed, 1)
- go func() {
- time.Sleep(delay)
- atomic.AddInt64(&q.state.Delayed, ^int64(0))
- atomic.AddInt64(&q.state.Queue, 1)
-
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-}
-
-func (q *queue) stat() *jobs.Stat {
- return &jobs.Stat{
- InternalName: ":memory:",
- Queue: atomic.LoadInt64(&q.state.Queue),
- Active: atomic.LoadInt64(&q.state.Active),
- Delayed: atomic.LoadInt64(&q.state.Delayed),
- }
-}
diff --git a/plugins/jobs/oooold/broker/ephemeral/stat_test.go b/plugins/jobs/oooold/broker/ephemeral/stat_test.go
deleted file mode 100644
index 0894323c..00000000
--- a/plugins/jobs/oooold/broker/ephemeral/stat_test.go
+++ /dev/null
@@ -1,64 +0,0 @@
-package ephemeral
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init()
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- stat, err := b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(1), stat.Active)
-
- close(waitJob)
- return nil
- }
-
- <-waitJob
- stat, err = b.Stat(pipe)
- assert.NoError(t, err)
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, int64(0), stat.Active)
-}
diff --git a/plugins/jobs/oooold/broker/sqs/broker.go b/plugins/jobs/oooold/broker/sqs/broker.go
deleted file mode 100644
index 8cc62b6b..00000000
--- a/plugins/jobs/oooold/broker/sqs/broker.go
+++ /dev/null
@@ -1,189 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/jobs/v2"
- "sync"
-)
-
-// Broker represents SQS broker.
-type Broker struct {
- cfg *Config
- sqs *sqs.SQS
- lsn func(event int, ctx interface{})
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*jobs.Pipeline]*queue
-}
-
-// Listen attaches server event watcher.
-func (b *Broker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures SQS broker.
-func (b *Broker) Init(cfg *Config) (ok bool, err error) {
- b.cfg = cfg
- b.queues = make(map[*jobs.Pipeline]*queue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *Broker) Register(pipe *jobs.Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
- }
-
- q, err := newQueue(pipe, b.throw)
- if err != nil {
- return err
- }
-
- b.queues[pipe] = q
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *Broker) Serve() (err error) {
- b.mu.Lock()
-
- b.sqs, err = b.cfg.SQS()
- if err != nil {
- return err
- }
-
- for _, q := range b.queues {
- q.url, err = q.declareQueue(b.sqs)
- if err != nil {
- return err
- }
- }
-
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve(b.sqs, b.cfg.TimeoutDuration())
- }
- }
-
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(jobs.EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *Broker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- for _, q := range b.queues {
- q.stop()
- }
-
- b.wait <- nil
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
-// the service is started!
-func (b *Broker) Consume(pipe *jobs.Pipeline, execPool chan jobs.Handler, errHandler jobs.ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.sqs != nil && q.execPool != nil {
- go q.serve(b.sqs, b.cfg.TimeoutDuration())
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *Broker) Push(pipe *jobs.Pipeline, j *jobs.Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- if j.Options.Delay > 900 || j.Options.RetryDelay > 900 {
- return "", fmt.Errorf("unable to push into `%s`, maximum delay value is 900", pipe.Name())
- }
-
- return q.send(b.sqs, j)
-}
-
-// Stat must fetch statistics about given pipeline or return error.
-func (b *Broker) Stat(pipe *jobs.Pipeline) (stat *jobs.Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined queue `%s`", pipe.Name())
- }
-
- return q.stat(b.sqs)
-}
-
-// check if broker is serving
-func (b *Broker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// queue returns queue associated with the pipeline.
-func (b *Broker) queue(pipe *jobs.Pipeline) *queue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *Broker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
diff --git a/plugins/jobs/oooold/broker/sqs/broker_test.go b/plugins/jobs/oooold/broker/sqs/broker_test.go
deleted file mode 100644
index c87b302d..00000000
--- a/plugins/jobs/oooold/broker/sqs/broker_test.go
+++ /dev/null
@@ -1,275 +0,0 @@
-package sqs
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-var (
- pipe = &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "test",
- "declare": map[string]interface{}{
- "MessageRetentionPeriod": 86400,
- },
- }
-
- cfg = &Config{
- Key: "api-key",
- Secret: "api-secret",
- Region: "us-west-1",
- Endpoint: "http://localhost:9324",
- }
-)
-
-func TestBroker_Init(t *testing.T) {
- b := &Broker{}
- ok, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.True(t, ok)
- assert.NoError(t, err)
-}
-
-func TestBroker_StopNotStarted(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- b.Stop()
-}
-
-func TestBroker_Register(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
-}
-
-func TestBroker_RegisterInvalid(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.Error(t, b.Register(&jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- }))
-}
-
-func TestBroker_Register_Twice(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Register(pipe))
- assert.Error(t, b.Register(pipe))
-}
-
-func TestBroker_Consume_Nil_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- assert.NoError(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_Undefined(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- assert.Error(t, b.Consume(pipe, nil, nil))
-}
-
-func TestBroker_Consume_BeforeServe(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- assert.NoError(t, b.Consume(pipe, exec, errf))
-}
-
-func TestBroker_Consume_Serve_Nil_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- err = b.Consume(pipe, nil, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_Stop(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- b.Consume(pipe, exec, errf)
-
- wait := make(chan interface{})
- go func() {
- assert.NoError(t, b.Serve())
- close(wait)
- }()
- time.Sleep(time.Millisecond * 100)
- b.Stop()
-
- <-wait
-}
-
-func TestBroker_Consume_Serve_InvalidQueue(t *testing.T) {
- pipe := &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "invalid",
- "declare": map[string]interface{}{
- "VisibilityTimeout": "invalid",
- },
- }
-
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- exec := make(chan jobs.Handler)
- errf := func(id string, j *jobs.Job, err error) {}
-
- b.Consume(pipe, exec, errf)
-
- assert.Error(t, b.Serve())
-}
-
-func TestBroker_PushToNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRunning(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
-
-func TestBroker_PushToNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Push(pipe, &jobs.Job{})
- assert.Error(t, err)
-}
-
-func TestBroker_StatNotRegistered(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, err = b.Stat(pipe)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/sqs/config.go b/plugins/jobs/oooold/broker/sqs/config.go
deleted file mode 100644
index d0c2f2b2..00000000
--- a/plugins/jobs/oooold/broker/sqs/config.go
+++ /dev/null
@@ -1,82 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/aws/credentials"
- "github.com/aws/aws-sdk-go/aws/session"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/roadrunner/service"
- "time"
-)
-
-// Config defines sqs broker configuration.
-type Config struct {
- // Region defined SQS region, not required when endpoint is not empty.
- Region string
-
- // Region defined AWS API key, not required when endpoint is not empty.
- Key string
-
- // Region defined AWS API secret, not required when endpoint is not empty.
- Secret string
-
- // Endpoint can be used to re-define SQS endpoint to custom location. Only for local development.
- Endpoint string
-
- // Timeout to allocate the connection. Default 10 seconds.
- Timeout int
-}
-
-// Hydrate config values.
-func (c *Config) Hydrate(cfg service.Config) error {
- if err := cfg.Unmarshal(c); err != nil {
- return err
- }
-
- if c.Region == "" {
- return fmt.Errorf("SQS region is missing")
- }
-
- if c.Key == "" {
- return fmt.Errorf("SQS key is missing")
- }
-
- if c.Secret == "" {
- return fmt.Errorf("SQS secret is missing")
- }
-
- return nil
-}
-
-// TimeoutDuration returns number of seconds allowed to allocate the connection.
-func (c *Config) TimeoutDuration() time.Duration {
- timeout := c.Timeout
- if timeout == 0 {
- timeout = 10
- }
-
- return time.Duration(timeout) * time.Second
-}
-
-// Session returns new AWS session.
-func (c *Config) Session() (*session.Session, error) {
- return session.NewSession(&aws.Config{
- Region: aws.String(c.Region),
- Credentials: credentials.NewStaticCredentials(c.Key, c.Secret, ""),
- })
-}
-
-// SQS returns new SQS instance or error.
-func (c *Config) SQS() (*sqs.SQS, error) {
- sess, err := c.Session()
- if err != nil {
- return nil, err
- }
-
- if c.Endpoint == "" {
- return sqs.New(sess), nil
- }
-
- return sqs.New(sess, &aws.Config{Endpoint: aws.String(c.Endpoint)}), nil
-}
diff --git a/plugins/jobs/oooold/broker/sqs/config_test.go b/plugins/jobs/oooold/broker/sqs/config_test.go
deleted file mode 100644
index b36b3c6f..00000000
--- a/plugins/jobs/oooold/broker/sqs/config_test.go
+++ /dev/null
@@ -1,48 +0,0 @@
-package sqs
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type mockCfg struct{ cfg string }
-
-func (cfg *mockCfg) Get(name string) service.Config { return nil }
-func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func Test_Config_Hydrate_Error(t *testing.T) {
- cfg := &mockCfg{`{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error2(t *testing.T) {
- cfg := &mockCfg{`{}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error3(t *testing.T) {
- cfg := &mockCfg{`{"region":"us-east-1"}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error4(t *testing.T) {
- cfg := &mockCfg{`{"region":"us-east-1","key":"key"}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error5(t *testing.T) {
- cfg := &mockCfg{`{"region":"us-east-1","key":"key","secret":"secret"}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-}
diff --git a/plugins/jobs/oooold/broker/sqs/consume_test.go b/plugins/jobs/oooold/broker/sqs/consume_test.go
deleted file mode 100644
index 434fc6ea..00000000
--- a/plugins/jobs/oooold/broker/sqs/consume_test.go
+++ /dev/null
@@ -1,370 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestBroker_Consume_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_JobUseExistedPipeline(t *testing.T) {
- pipe := &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "test",
- }
-
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_PushTooBigDelay(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{
- Delay: 901,
- },
- })
-
- assert.Error(t, perr)
-}
-
-func TestBroker_Consume_PushTooBigDelay2(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- _, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{
- RetryDelay: 901,
- },
- })
-
- assert.Error(t, perr)
-}
-
-func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Consume_Delayed(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Delay: 1},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- start := time.Now()
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-
- elapsed := time.Since(start)
- assert.True(t, elapsed > time.Second)
- assert.True(t, elapsed < 2*time.Second)
-}
-
-func TestBroker_Consume_Errored(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- close(errHandled)
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return fmt.Errorf("job failed")
- }
-
- <-waitJob
- <-errHandled
- b.Stop()
-}
-
-func TestBroker_Consume_Errored_Attempts(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- attempts := 0
- errHandled := make(chan interface{})
- errHandler := func(id string, j *jobs.Job, err error) {
- assert.Equal(t, "job failed", err.Error())
- attempts++
- errHandled <- nil
- }
-
- exec := make(chan jobs.Handler, 1)
-
- assert.NoError(t, b.Consume(pipe, exec, errHandler))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Attempts: 3},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- return fmt.Errorf("job failed")
- }
-
- <-errHandled
- <-errHandled
- <-errHandled
- assert.Equal(t, 3, attempts)
-}
diff --git a/plugins/jobs/oooold/broker/sqs/durability_test.go b/plugins/jobs/oooold/broker/sqs/durability_test.go
deleted file mode 100644
index 58ddf4b9..00000000
--- a/plugins/jobs/oooold/broker/sqs/durability_test.go
+++ /dev/null
@@ -1,588 +0,0 @@
-package sqs
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "io"
- "net"
- "sync"
- "testing"
- "time"
-)
-
-var (
- proxyCfg = &Config{
- Key: "api-key",
- Secret: "api-secret",
- Region: "us-west-1",
- Endpoint: "http://localhost:9325",
- Timeout: 1,
- }
-
- proxy = &tcpProxy{
- listen: "localhost:9325",
- upstream: "localhost:9324",
- accept: true,
- }
-
- proxyPipe = &jobs.Pipeline{
- "broker": "sqs",
- "name": "default",
- "queue": "test",
- "lockReserved": 1,
- "declare": map[string]interface{}{
- "MessageRetentionPeriod": 86400,
- },
- }
-)
-
-type tcpProxy struct {
- listen string
- upstream string
- mu sync.Mutex
- accept bool
- conn []net.Conn
-}
-
-func (p *tcpProxy) serve() {
- l, err := net.Listen("tcp", p.listen)
- if err != nil {
- panic(err)
- }
-
- for {
- in, err := l.Accept()
- if err != nil {
- panic(err)
- }
-
- if !p.accepting() {
- in.Close()
- }
-
- up, err := net.Dial("tcp", p.upstream)
- if err != nil {
- panic(err)
- }
-
- go io.Copy(in, up)
- go io.Copy(up, in)
-
- p.mu.Lock()
- p.conn = append(p.conn, in, up)
- p.mu.Unlock()
- }
-}
-
-// wait for specific number of connections
-func (p *tcpProxy) waitConn(count int) *tcpProxy {
- p.mu.Lock()
- p.accept = true
- p.mu.Unlock()
-
- for {
- p.mu.Lock()
- current := len(p.conn)
- p.mu.Unlock()
-
- if current >= count*2 {
- break
- }
-
- time.Sleep(time.Millisecond)
- }
-
- return p
-}
-
-func (p *tcpProxy) reset(accept bool) int {
- p.mu.Lock()
- p.accept = accept
- defer p.mu.Unlock()
-
- count := 0
- for _, conn := range p.conn {
- conn.Close()
- count++
- }
-
- p.conn = nil
- return count / 2
-}
-
-func (p *tcpProxy) accepting() bool {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- return p.accept
-}
-
-func init() {
- go proxy.serve()
-}
-
-func TestBroker_Durability_Base(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- // expect 2 connections
- proxy.waitConn(1)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
- close(waitJob)
- return nil
- }
-
- <-waitJob
-}
-
-func TestBroker_Durability_Consume(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(1)
-
- jid, perr = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume_LongTimeout(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- time.Sleep(3 * time.Second)
- proxy.waitConn(1)
-
- jid, perr = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume2(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{},
- })
-
- assert.Error(t, perr)
-
- // restore
- proxy.waitConn(2)
-
- jid, perr = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(proxyPipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- proxy.reset(false)
-
- _, serr = b.Stat(proxyPipe)
- assert.Error(t, serr)
-
- proxy.reset(true)
-
- _, serr = b.Stat(proxyPipe)
- assert.NoError(t, serr)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume3(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1)
-
- jid, perr := b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- st, serr := b.Stat(proxyPipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(1), st.Queue+st.Active)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 1 {
- break
- }
- }
-}
-
-func TestBroker_Durability_Consume4(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- proxy.waitConn(1)
-
- _, err = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "kill",
- Options: &jobs.Options{Timeout: 2},
- })
- if err != nil {
- t.Fatal(err)
- }
- _, err = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = b.Push(proxyPipe, &jobs.Job{
- Job: "test",
- Payload: "body",
- Options: &jobs.Options{Timeout: 2},
- })
- if err != nil {
- t.Fatal(err)
- }
-
- st, serr := b.Stat(proxyPipe)
- assert.NoError(t, serr)
- assert.Equal(t, int64(3), st.Queue+st.Active)
-
- mu := sync.Mutex{}
- done := make(map[string]bool)
- exec <- func(id string, j *jobs.Job) error {
- mu.Lock()
- defer mu.Unlock()
- done[id] = true
-
- if j.Payload == "kill" {
- proxy.reset(true)
- }
-
- return nil
- }
-
- for {
- mu.Lock()
- num := len(done)
- mu.Unlock()
-
- if num >= 3 {
- break
- }
- }
-}
-
-func TestBroker_Durability_StopDead(t *testing.T) {
- defer proxy.reset(true)
-
- b := &Broker{}
- _, err := b.Init(proxyCfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(proxyPipe)
- if err != nil {
- t.Fatal(err)
- }
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
- assert.NoError(t, b.Consume(proxyPipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- go func() { assert.NoError(t, b.Serve()) }()
-
- <-ready
-
- proxy.waitConn(1).reset(false)
-
- b.Stop()
-}
diff --git a/plugins/jobs/oooold/broker/sqs/job.go b/plugins/jobs/oooold/broker/sqs/job.go
deleted file mode 100644
index 50e2c164..00000000
--- a/plugins/jobs/oooold/broker/sqs/job.go
+++ /dev/null
@@ -1,80 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/jobs/v2"
- "strconv"
- "time"
-)
-
-var jobAttributes = []*string{
- aws.String("rr-job"),
- aws.String("rr-maxAttempts"),
- aws.String("rr-delay"),
- aws.String("rr-timeout"),
- aws.String("rr-retryDelay"),
-}
-
-// pack job metadata into headers
-func pack(url *string, j *jobs.Job) *sqs.SendMessageInput {
- return &sqs.SendMessageInput{
- QueueUrl: url,
- DelaySeconds: aws.Int64(int64(j.Options.Delay)),
- MessageBody: aws.String(j.Payload),
- MessageAttributes: map[string]*sqs.MessageAttributeValue{
- "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)},
- "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)},
- "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())},
- "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())},
- "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())},
- },
- }
-}
-
-// unpack restores jobs.Options
-func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) {
- if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok {
- return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount")
- }
- attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"])
-
- for _, attr := range jobAttributes {
- if _, ok := msg.MessageAttributes[*attr]; !ok {
- return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr)
- }
- }
-
- j = &jobs.Job{
- Job: *msg.MessageAttributes["rr-job"].StringValue,
- Payload: *msg.Body,
- Options: &jobs.Options{},
- }
-
- if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil {
- j.Options.Delay = delay
- }
-
- if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil {
- j.Options.Attempts = maxAttempts
- }
-
- if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil {
- j.Options.Timeout = timeout
- }
-
- if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil {
- j.Options.RetryDelay = retryDelay
- }
-
- return *msg.MessageId, attempt - 1, j, nil
-}
-
-func awsString(n int) *string {
- return aws.String(strconv.Itoa(n))
-}
-
-func awsDuration(d time.Duration) *string {
- return aws.String(strconv.Itoa(int(d.Seconds())))
-}
diff --git a/plugins/jobs/oooold/broker/sqs/job_test.go b/plugins/jobs/oooold/broker/sqs/job_test.go
deleted file mode 100644
index a120af53..00000000
--- a/plugins/jobs/oooold/broker/sqs/job_test.go
+++ /dev/null
@@ -1,19 +0,0 @@
-package sqs
-
-import (
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func Test_Unpack(t *testing.T) {
- msg := &sqs.Message{
- Body: aws.String("body"),
- Attributes: map[string]*string{},
- MessageAttributes: map[string]*sqs.MessageAttributeValue{},
- }
-
- _, _, _, err := unpack(msg)
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/broker/sqs/queue.go b/plugins/jobs/oooold/broker/sqs/queue.go
deleted file mode 100644
index 8a92448e..00000000
--- a/plugins/jobs/oooold/broker/sqs/queue.go
+++ /dev/null
@@ -1,266 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/jobs/v2"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type queue struct {
- active int32
- pipe *jobs.Pipeline
- url *string
- reserve time.Duration
- lockReserved time.Duration
-
- // queue events
- lsn func(event int, ctx interface{})
-
- // stop channel
- wait chan interface{}
-
- // active operations
- muw sync.RWMutex
- wg sync.WaitGroup
-
- // exec handlers
- execPool chan jobs.Handler
- errHandler jobs.ErrorHandler
-}
-
-func newQueue(pipe *jobs.Pipeline, lsn func(event int, ctx interface{})) (*queue, error) {
- if pipe.String("queue", "") == "" {
- return nil, fmt.Errorf("missing `queue` parameter on sqs pipeline `%s`", pipe.Name())
- }
-
- return &queue{
- pipe: pipe,
- reserve: pipe.Duration("reserve", time.Second),
- lockReserved: pipe.Duration("lockReserved", 300*time.Second),
- lsn: lsn,
- }, nil
-}
-
-// declareQueue declared queue
-func (q *queue) declareQueue(s *sqs.SQS) (*string, error) {
- attr := make(map[string]*string)
- for k, v := range q.pipe.Map("declare") {
- if vs, ok := v.(string); ok {
- attr[k] = aws.String(vs)
- }
-
- if vi, ok := v.(int); ok {
- attr[k] = aws.String(strconv.Itoa(vi))
- }
-
- if vb, ok := v.(bool); ok {
- if vb {
- attr[k] = aws.String("true")
- } else {
- attr[k] = aws.String("false")
- }
- }
- }
-
- if len(attr) != 0 {
- r, err := s.CreateQueue(&sqs.CreateQueueInput{
- QueueName: aws.String(q.pipe.String("queue", "")),
- Attributes: attr,
- })
-
- return r.QueueUrl, err
- }
-
- // no need to create (get existed)
- r, err := s.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(q.pipe.String("queue", ""))})
- if err != nil {
- return nil, err
- }
-
- return r.QueueUrl, nil
-}
-
-// serve consumers
-func (q *queue) serve(s *sqs.SQS, tout time.Duration) {
- q.wait = make(chan interface{})
- atomic.StoreInt32(&q.active, 1)
-
- var errored bool
- for {
- messages, stop, err := q.consume(s)
- if err != nil {
- if errored {
- // reoccurring error
- time.Sleep(tout)
- } else {
- errored = true
- q.report(err)
- }
-
- continue
- }
- errored = false
-
- if stop {
- return
- }
-
- for _, msg := range messages {
- h := <-q.execPool
- go func(h jobs.Handler, msg *sqs.Message) {
- err := q.do(s, h, msg)
- q.execPool <- h
- q.wg.Done()
- q.report(err)
- }(h, msg)
- }
- }
-}
-
-// consume and allocate connection.
-func (q *queue) consume(s *sqs.SQS) ([]*sqs.Message, bool, error) {
- q.muw.Lock()
- defer q.muw.Unlock()
-
- select {
- case <-q.wait:
- return nil, true, nil
- default:
- r, err := s.ReceiveMessage(&sqs.ReceiveMessageInput{
- QueueUrl: q.url,
- MaxNumberOfMessages: aws.Int64(int64(q.pipe.Integer("prefetch", 1))),
- WaitTimeSeconds: aws.Int64(int64(q.reserve.Seconds())),
- VisibilityTimeout: aws.Int64(int64(q.lockReserved.Seconds())),
- AttributeNames: []*string{aws.String("ApproximateReceiveCount")},
- MessageAttributeNames: jobAttributes,
- })
- if err != nil {
- return nil, false, err
- }
-
- q.wg.Add(len(r.Messages))
-
- return r.Messages, false, nil
- }
-}
-
-// do single message
-func (q *queue) do(s *sqs.SQS, h jobs.Handler, msg *sqs.Message) (err error) {
- id, attempt, j, err := unpack(msg)
- if err != nil {
- go q.deleteMessage(s, msg, err)
- return err
- }
-
- // block the job based on known timeout
- _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- QueueUrl: q.url,
- ReceiptHandle: msg.ReceiptHandle,
- VisibilityTimeout: aws.Int64(int64(j.Options.TimeoutDuration().Seconds())),
- })
- if err != nil {
- go q.deleteMessage(s, msg, err)
- return err
- }
-
- err = h(id, j)
- if err == nil {
- return q.deleteMessage(s, msg, nil)
- }
-
- q.errHandler(id, j, err)
-
- if !j.Options.CanRetry(attempt) {
- return q.deleteMessage(s, msg, err)
- }
-
- // retry after specified duration
- _, err = s.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
- QueueUrl: q.url,
- ReceiptHandle: msg.ReceiptHandle,
- VisibilityTimeout: aws.Int64(int64(j.Options.RetryDelay)),
- })
-
- return err
-}
-
-func (q *queue) deleteMessage(s *sqs.SQS, msg *sqs.Message, err error) error {
- _, drr := s.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: q.url, ReceiptHandle: msg.ReceiptHandle})
- return drr
-}
-
-// stop the queue consuming
-func (q *queue) stop() {
- if atomic.LoadInt32(&q.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&q.active, 0)
-
- close(q.wait)
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-}
-
-// add job to the queue
-func (q *queue) send(s *sqs.SQS, j *jobs.Job) (string, error) {
- r, err := s.SendMessage(pack(q.url, j))
- if err != nil {
- return "", err
- }
-
- return *r.MessageId, nil
-}
-
-// return queue stats
-func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) {
- r, err := s.GetQueueAttributes(&sqs.GetQueueAttributesInput{
- QueueUrl: q.url,
- AttributeNames: []*string{
- aws.String("ApproximateNumberOfMessages"),
- aws.String("ApproximateNumberOfMessagesDelayed"),
- aws.String("ApproximateNumberOfMessagesNotVisible"),
- },
- })
-
- if err != nil {
- return nil, err
- }
-
- stat = &jobs.Stat{InternalName: q.pipe.String("queue", "")}
-
- for a, v := range r.Attributes {
- if a == "ApproximateNumberOfMessages" {
- if v, err := strconv.Atoi(*v); err == nil {
- stat.Queue = int64(v)
- }
- }
-
- if a == "ApproximateNumberOfMessagesNotVisible" {
- if v, err := strconv.Atoi(*v); err == nil {
- stat.Active = int64(v)
- }
- }
-
- if a == "ApproximateNumberOfMessagesDelayed" {
- if v, err := strconv.Atoi(*v); err == nil {
- stat.Delayed = int64(v)
- }
- }
- }
-
- return stat, nil
-}
-
-// throw handles service, server and pool events.
-func (q *queue) report(err error) {
- if err != nil {
- q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
- }
-}
diff --git a/plugins/jobs/oooold/broker/sqs/stat_test.go b/plugins/jobs/oooold/broker/sqs/stat_test.go
deleted file mode 100644
index 5031571b..00000000
--- a/plugins/jobs/oooold/broker/sqs/stat_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package sqs
-
-import (
- "github.com/spiral/jobs/v2"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestBroker_Stat(t *testing.T) {
- b := &Broker{}
- _, err := b.Init(cfg)
- if err != nil {
- t.Fatal(err)
- }
- err = b.Register(pipe)
- if err != nil {
- t.Fatal(err)
- }
-
- ready := make(chan interface{})
- b.Listen(func(event int, ctx interface{}) {
- if event == jobs.EventBrokerReady {
- close(ready)
- }
- })
-
- exec := make(chan jobs.Handler, 1)
-
- go func() { assert.NoError(t, b.Serve()) }()
- defer b.Stop()
-
- <-ready
-
- jid, perr := b.Push(pipe, &jobs.Job{Job: "test", Payload: "body", Options: &jobs.Options{}})
-
- assert.NotEqual(t, "", jid)
- assert.NoError(t, perr)
-
- // unable to use approximated stats
- _, err = b.Stat(pipe)
- assert.NoError(t, err)
-
- assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
-
- waitJob := make(chan interface{})
- exec <- func(id string, j *jobs.Job) error {
- assert.Equal(t, jid, id)
- assert.Equal(t, "body", j.Payload)
-
- _, err := b.Stat(pipe)
- assert.NoError(t, err)
-
- close(waitJob)
- return nil
- }
-
- <-waitJob
- _, err = b.Stat(pipe)
- assert.NoError(t, err)
-}
diff --git a/plugins/jobs/oooold/broker_test.go b/plugins/jobs/oooold/broker_test.go
deleted file mode 100644
index b93eac51..00000000
--- a/plugins/jobs/oooold/broker_test.go
+++ /dev/null
@@ -1,314 +0,0 @@
-package oooold
-
-import (
- "fmt"
- "github.com/gofrs/uuid"
- "sync"
- "sync/atomic"
- "time"
-)
-
-// testBroker run testQueue using local goroutines.
-type testBroker struct {
- lsn func(event int, ctx interface{})
- mu sync.Mutex
- wait chan error
- stopped chan interface{}
- queues map[*Pipeline]*testQueue
-}
-
-// Listen attaches server event watcher.
-func (b *testBroker) Listen(lsn func(event int, ctx interface{})) {
- b.lsn = lsn
-}
-
-// Init configures broker.
-func (b *testBroker) Init() (bool, error) {
- b.queues = make(map[*Pipeline]*testQueue)
-
- return true, nil
-}
-
-// Register broker pipeline.
-func (b *testBroker) Register(pipe *Pipeline) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if _, ok := b.queues[pipe]; ok {
- return fmt.Errorf("testQueue `%s` has already been registered", pipe.Name())
- }
-
- b.queues[pipe] = newQueue()
-
- return nil
-}
-
-// Serve broker pipelines.
-func (b *testBroker) Serve() error {
- // start pipelines
- b.mu.Lock()
- for _, q := range b.queues {
- qq := q
- if qq.execPool != nil {
- go qq.serve()
- }
- }
- b.wait = make(chan error)
- b.stopped = make(chan interface{})
- defer close(b.stopped)
-
- b.mu.Unlock()
-
- b.throw(EventBrokerReady, b)
-
- return <-b.wait
-}
-
-// Stop all pipelines.
-func (b *testBroker) Stop() {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return
- }
-
- // stop all pipelines
- for _, q := range b.queues {
- q.stop()
- }
-
- close(b.wait)
- <-b.stopped
-}
-
-// Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before
-// the service is started!
-func (b *testBroker) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return fmt.Errorf("undefined testQueue `%s`", pipe.Name())
- }
-
- q.stop()
-
- q.execPool = execPool
- q.errHandler = errHandler
-
- if b.wait != nil {
- if q.execPool != nil {
- go q.serve()
- }
- }
-
- return nil
-}
-
-// Push job into the worker.
-func (b *testBroker) Push(pipe *Pipeline, j *Job) (string, error) {
- if err := b.isServing(); err != nil {
- return "", err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return "", fmt.Errorf("undefined testQueue `%s`", pipe.Name())
- }
-
- id, err := uuid.NewV4()
- if err != nil {
- return "", err
- }
-
- q.push(id.String(), j, 0, j.Options.DelayDuration())
-
- return id.String(), nil
-}
-
-// Stat must consume statistics about given pipeline or return error.
-func (b *testBroker) Stat(pipe *Pipeline) (stat *Stat, err error) {
- if err := b.isServing(); err != nil {
- return nil, err
- }
-
- q := b.queue(pipe)
- if q == nil {
- return nil, fmt.Errorf("undefined testQueue `%s`", pipe.Name())
- }
-
- return q.stat(), nil
-}
-
-// check if broker is serving
-func (b *testBroker) isServing() error {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.wait == nil {
- return fmt.Errorf("broker is not running")
- }
-
- return nil
-}
-
-// testQueue returns testQueue associated with the pipeline.
-func (b *testBroker) queue(pipe *Pipeline) *testQueue {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- q, ok := b.queues[pipe]
- if !ok {
- return nil
- }
-
- return q
-}
-
-// throw handles service, server and pool events.
-func (b *testBroker) throw(event int, ctx interface{}) {
- if b.lsn != nil {
- b.lsn(event, ctx)
- }
-}
-
-type testQueue struct {
- active int32
- st *Stat
-
- // job pipeline
- jobs chan *entry
-
- // pipelines operations
- muw sync.Mutex
- wg sync.WaitGroup
-
- // stop channel
- wait chan interface{}
-
- // exec handlers
- execPool chan Handler
- errHandler ErrorHandler
-}
-
-type entry struct {
- id string
- job *Job
- attempt int
-}
-
-// create new testQueue
-func newQueue() *testQueue {
- return &testQueue{st: &Stat{}, jobs: make(chan *entry)}
-}
-
-// todo NOT USED
-// associate testQueue with new do pool
-//func (q *testQueue) configure(execPool chan Handler, err ErrorHandler) error {
-// q.execPool = execPool
-// q.errHandler = err
-//
-// return nil
-//}
-
-// serve consumers
-func (q *testQueue) serve() {
- q.wait = make(chan interface{})
- atomic.StoreInt32(&q.active, 1)
-
- for {
- e := q.consume()
- if e == nil {
- return
- }
-
- atomic.AddInt64(&q.st.Active, 1)
- h := <-q.execPool
- go func(e *entry) {
- q.do(h, e)
- atomic.AddInt64(&q.st.Active, ^int64(0))
- q.execPool <- h
- q.wg.Done()
- }(e)
- }
-}
-
-// allocate one job entry
-func (q *testQueue) consume() *entry {
- q.muw.Lock()
- defer q.muw.Unlock()
-
- select {
- case <-q.wait:
- return nil
- case e := <-q.jobs:
- q.wg.Add(1)
-
- return e
- }
-}
-
-// do singe job
-func (q *testQueue) do(h Handler, e *entry) {
- err := h(e.id, e.job)
-
- if err == nil {
- atomic.AddInt64(&q.st.Queue, ^int64(0))
- return
- }
-
- q.errHandler(e.id, e.job, err)
-
- if !e.job.Options.CanRetry(e.attempt) {
- atomic.AddInt64(&q.st.Queue, ^int64(0))
- return
- }
-
- q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration())
-}
-
-// stop the testQueue pipelines
-func (q *testQueue) stop() {
- if atomic.LoadInt32(&q.active) == 0 {
- return
- }
-
- atomic.StoreInt32(&q.active, 0)
-
- close(q.wait)
- q.muw.Lock()
- q.wg.Wait()
- q.muw.Unlock()
-}
-
-// add job to the testQueue
-func (q *testQueue) push(id string, j *Job, attempt int, delay time.Duration) {
- if delay == 0 {
- atomic.AddInt64(&q.st.Queue, 1)
- go func() {
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-
- return
- }
-
- atomic.AddInt64(&q.st.Delayed, 1)
- go func() {
- time.Sleep(delay)
- atomic.AddInt64(&q.st.Delayed, ^int64(0))
- atomic.AddInt64(&q.st.Queue, 1)
-
- q.jobs <- &entry{id: id, job: j, attempt: attempt}
- }()
-}
-
-func (q *testQueue) stat() *Stat {
- return &Stat{
- InternalName: ":memory:",
- Queue: atomic.LoadInt64(&q.st.Queue),
- Active: atomic.LoadInt64(&q.st.Active),
- Delayed: atomic.LoadInt64(&q.st.Delayed),
- }
-}
diff --git a/plugins/jobs/oooold/config.go b/plugins/jobs/oooold/config.go
deleted file mode 100644
index cf40b6fb..00000000
--- a/plugins/jobs/oooold/config.go
+++ /dev/null
@@ -1,91 +0,0 @@
-package oooold
-
-import (
- "fmt"
- "github.com/spiral/roadrunner"
- "github.com/spiral/roadrunner/service"
-)
-
-// Config defines settings for job broker, workers and job-pipeline mapping.
-type Config struct {
- // Workers configures roadrunner server and worker busy.
- Workers *roadrunner.ServerConfig
-
- // Dispatch defines where and how to match jobs.
- Dispatch map[string]*Options
-
- // Pipelines defines mapping between PHP job pipeline and associated job broker.
- Pipelines map[string]*Pipeline
-
- // Consuming specifies names of pipelines to be consumed on service start.
- Consume []string
-
- // parent config for broken options.
- parent service.Config
- pipelines Pipelines
- route Dispatcher
-}
-
-// Hydrate populates config values.
-func (c *Config) Hydrate(cfg service.Config) (err error) {
- c.Workers = &roadrunner.ServerConfig{}
- c.Workers.InitDefaults()
-
- if err := cfg.Unmarshal(&c); err != nil {
- return err
- }
-
- c.pipelines, err = initPipelines(c.Pipelines)
- if err != nil {
- return err
- }
-
- if c.Workers.Command != "" {
- if err := c.Workers.Pool.Valid(); err != nil {
- return c.Workers.Pool.Valid()
- }
- }
-
- c.parent = cfg
- c.route = initDispatcher(c.Dispatch)
-
- return nil
-}
-
-// MatchPipeline locates the pipeline associated with the job.
-func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error) {
- opt := c.route.match(job)
-
- pipe := ""
- if job.Options != nil {
- pipe = job.Options.Pipeline
- }
-
- if pipe == "" && opt != nil {
- pipe = opt.Pipeline
- }
-
- if pipe == "" {
- return nil, nil, fmt.Errorf("unable to locate pipeline for `%s`", job.Job)
- }
-
- if p := c.pipelines.Get(pipe); p != nil {
- return p, opt, nil
- }
-
- return nil, nil, fmt.Errorf("undefined pipeline `%s`", pipe)
-}
-
-// Get underlying broker config.
-func (c *Config) Get(service string) service.Config {
- if c.parent == nil {
- return nil
- }
-
- return c.parent.Get(service)
-}
-
-// Unmarshal is doing nothing.
-func (c *Config) Unmarshal(out interface{}) error {
- return nil
-}
diff --git a/plugins/jobs/oooold/config_test.go b/plugins/jobs/oooold/config_test.go
deleted file mode 100644
index 5f14eb32..00000000
--- a/plugins/jobs/oooold/config_test.go
+++ /dev/null
@@ -1,158 +0,0 @@
-package oooold
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type mockCfg struct{ cfg string }
-
-func (cfg *mockCfg) Get(name string) service.Config {
- if name == "same" || name == "jobs" {
- return cfg
- }
-
- return nil
-}
-func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func Test_Config_Hydrate_Error(t *testing.T) {
- cfg := &mockCfg{cfg: `{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_OK(t *testing.T) {
- cfg := &mockCfg{cfg: `{
- "workers":{"pool":{"numWorkers": 1}}
-}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Unmarshal(t *testing.T) {
- cfg := &mockCfg{cfg: `{
- "workers":{"pool":{"numWorkers": 1}}
-}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- var i interface{}
- assert.Nil(t, c.Unmarshal(i))
-}
-
-func Test_Config_Hydrate_Get(t *testing.T) {
- cfg := &mockCfg{cfg: `{
- "workers":{"pool":{"numWorkers": 1}}
-}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- assert.Nil(t, c.Get("nil"))
-}
-
-func Test_Config_Hydrate_Get_Valid(t *testing.T) {
- cfg := &mockCfg{cfg: `{
- "workers":{"pool":{"numWorkers": 1}}
-}`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- assert.Equal(t, cfg, c.Get("same"))
-}
-
-func Test_Config_Hydrate_GetNoParent(t *testing.T) {
- c := &Config{}
- assert.Nil(t, c.Get("nil"))
-}
-
-func Test_Pipelines(t *testing.T) {
- cfg := &mockCfg{cfg: `{
- "workers":{
- "pool":{"numWorkers": 1}
- },
- "pipelines":{
- "pipe": {"broker":"broker"}
- },
- "dispatch":{
- "job.*": {"pipeline":"default"}
- }
- }`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- assert.Equal(t, "pipe", c.pipelines.Get("pipe").Name())
- assert.Equal(t, "broker", c.pipelines.Get("pipe").Broker())
-}
-
-func Test_Pipelines_NoBroker(t *testing.T) {
- cfg := &mockCfg{cfg: `{
- "workers":{
- "pool":{"numWorkers": 1}
- },
- "pipelines":{
- "pipe": {}
- },
- "dispatch":{
- "job.*": {"pipeline":"default"}
- }
- }`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_MatchPipeline(t *testing.T) {
- cfg := &mockCfg{cfg: `{
- "workers":{
- "pool":{"numWorkers": 1}
- },
- "pipelines":{
- "pipe": {"broker":"default"}
- },
- "dispatch":{
- "job.*": {"pipeline":"pipe","delay":10}
- }
- }`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- _, _, err := c.MatchPipeline(&Job{Job: "undefined", Options: &Options{}})
- assert.Error(t, err)
-
- p, _, _ := c.MatchPipeline(&Job{Job: "undefined", Options: &Options{Pipeline: "pipe"}})
- assert.Equal(t, "pipe", p.Name())
-
- p, opt, _ := c.MatchPipeline(&Job{Job: "job.abc", Options: &Options{}})
- assert.Equal(t, "pipe", p.Name())
- assert.Equal(t, 10, opt.Delay)
-}
-
-func Test_MatchPipeline_Error(t *testing.T) {
- cfg := &mockCfg{cfg: `{
- "workers":{
- "pool":{"numWorkers": 1}
- },
- "pipelines":{
- "pipe": {"broker":"default"}
- },
- "dispatch":{
- "job.*": {"pipeline":"missing"}
- }
- }`}
- c := &Config{}
-
- assert.NoError(t, c.Hydrate(cfg))
-
- _, _, err := c.MatchPipeline(&Job{Job: "job.abc", Options: &Options{}})
- assert.Error(t, err)
-}
diff --git a/plugins/jobs/oooold/dispatcher.go b/plugins/jobs/oooold/dispatcher.go
deleted file mode 100644
index 801e1973..00000000
--- a/plugins/jobs/oooold/dispatcher.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package oooold
-
-import (
- "strings"
-)
-
-var separators = []string{"/", "-", "\\"}
-
-// Dispatcher provides ability to automatically locate the pipeline for the specific job
-// and update job options (if none set).
-type Dispatcher map[string]*Options
-
-// pre-compile patterns
-func initDispatcher(routes map[string]*Options) Dispatcher {
- dispatcher := make(Dispatcher)
- for pattern, opts := range routes {
- pattern = strings.ToLower(pattern)
- pattern = strings.Trim(pattern, "-.*")
-
- for _, s := range separators {
- pattern = strings.Replace(pattern, s, ".", -1)
- }
-
- dispatcher[pattern] = opts
- }
-
- return dispatcher
-}
-
-// match clarifies target job pipeline and other job options. Can return nil.
-func (dispatcher Dispatcher) match(job *Job) (found *Options) {
- var best = 0
-
- jobName := strings.ToLower(job.Job)
- for pattern, opts := range dispatcher {
- if strings.HasPrefix(jobName, pattern) && len(pattern) > best {
- found = opts
- best = len(pattern)
- }
- }
-
- if best == 0 {
- return nil
- }
-
- return found
-}
diff --git a/plugins/jobs/oooold/dispatcher_test.go b/plugins/jobs/oooold/dispatcher_test.go
deleted file mode 100644
index 92f8c956..00000000
--- a/plugins/jobs/oooold/dispatcher_test.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package oooold
-
-import (
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func Test_Map_All(t *testing.T) {
- m := initDispatcher(map[string]*Options{"default": {Pipeline: "default"}})
- assert.Equal(t, "default", m.match(&Job{Job: "default"}).Pipeline)
-}
-
-func Test_Map_Miss(t *testing.T) {
- m := initDispatcher(map[string]*Options{"some.*": {Pipeline: "default"}})
-
- assert.Nil(t, m.match(&Job{Job: "miss"}))
-}
-
-func Test_Map_Best(t *testing.T) {
- m := initDispatcher(map[string]*Options{
- "some.*": {Pipeline: "default"},
- "some.other.*": {Pipeline: "other"},
- })
-
- assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
- assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
- assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline)
- assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline)
-}
-
-func Test_Map_BestUpper(t *testing.T) {
- m := initDispatcher(map[string]*Options{
- "some.*": {Pipeline: "default"},
- "some.Other.*": {Pipeline: "other"},
- })
-
- assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
- assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
- assert.Equal(t, "other", m.match(&Job{Job: "some.OTHER"}).Pipeline)
- assert.Equal(t, "other", m.match(&Job{Job: "Some.other.job"}).Pipeline)
-}
-
-func Test_Map_BestReversed(t *testing.T) {
- m := initDispatcher(map[string]*Options{
- "some.*": {Pipeline: "default"},
- "some.other.*": {Pipeline: "other"},
- })
-
- assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline)
- assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline)
- assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
- assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
-}
diff --git a/plugins/jobs/oooold/event.go b/plugins/jobs/oooold/event.go
deleted file mode 100644
index 6524712f..00000000
--- a/plugins/jobs/oooold/event.go
+++ /dev/null
@@ -1,96 +0,0 @@
-package oooold
-
-import "time"
-
-const (
- // EventPushOK thrown when new job has been added. JobEvent is passed as context.
- EventPushOK = iota + 1500
-
- // EventPushError caused when job can not be registered.
- EventPushError
-
- // EventJobStart thrown when new job received.
- EventJobStart
-
- // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
- EventJobOK
-
- // EventJobError thrown on all job related errors. See JobError as context.
- EventJobError
-
- // EventPipeConsume when pipeline pipelines has been requested.
- EventPipeConsume
-
- // EventPipeActive when pipeline has started.
- EventPipeActive
-
- // EventPipeStop when pipeline has begun stopping.
- EventPipeStop
-
- // EventPipeStopped when pipeline has been stopped.
- EventPipeStopped
-
- // EventPipeError when pipeline specific error happen.
- EventPipeError
-
- // EventBrokerReady thrown when broken is ready to accept/serve tasks.
- EventBrokerReady
-)
-
-// JobEvent represent job event.
-type JobEvent struct {
- // String is job id.
- ID string
-
- // Job is failed job.
- Job *Job
-
- // event timings
- start time.Time
- elapsed time.Duration
-}
-
-// Elapsed returns duration of the invocation.
-func (e *JobEvent) Elapsed() time.Duration {
- return e.elapsed
-}
-
-// JobError represents singular Job error event.
-type JobError struct {
- // String is job id.
- ID string
-
- // Job is failed job.
- Job *Job
-
- // Caused contains job specific error.
- Caused error
-
- // event timings
- start time.Time
- elapsed time.Duration
-}
-
-// Elapsed returns duration of the invocation.
-func (e *JobError) Elapsed() time.Duration {
- return e.elapsed
-}
-
-// Caused returns error message.
-func (e *JobError) Error() string {
- return e.Caused.Error()
-}
-
-// PipelineError defines pipeline specific errors.
-type PipelineError struct {
- // Pipeline is associated pipeline.
- Pipeline *Pipeline
-
- // Caused send by broker.
- Caused error
-}
-
-// Error returns error message.
-func (e *PipelineError) Error() string {
- return e.Caused.Error()
-}
diff --git a/plugins/jobs/oooold/event_test.go b/plugins/jobs/oooold/event_test.go
deleted file mode 100644
index 82241124..00000000
--- a/plugins/jobs/oooold/event_test.go
+++ /dev/null
@@ -1,52 +0,0 @@
-package oooold
-
-import (
- "errors"
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestJobEvent_Elapsed(t *testing.T) {
- e := &JobEvent{
- ID: "id",
- Job: &Job{},
- start: time.Now(),
- elapsed: time.Millisecond,
- }
-
- assert.Equal(t, time.Millisecond, e.Elapsed())
-}
-
-func TestJobError_Elapsed(t *testing.T) {
- e := &JobError{
- ID: "id",
- Job: &Job{},
- start: time.Now(),
- elapsed: time.Millisecond,
- }
-
- assert.Equal(t, time.Millisecond, e.Elapsed())
-}
-
-func TestJobError_Error(t *testing.T) {
- e := &JobError{
- ID: "id",
- Job: &Job{},
- start: time.Now(),
- elapsed: time.Millisecond,
- Caused: errors.New("error"),
- }
-
- assert.Equal(t, time.Millisecond, e.Elapsed())
- assert.Equal(t, "error", e.Error())
-}
-
-func TestPipelineError_Error(t *testing.T) {
- e := &PipelineError{
- Pipeline: &Pipeline{},
- Caused: errors.New("error"),
- }
-
- assert.Equal(t, "error", e.Error())
-}
diff --git a/plugins/jobs/oooold/job.go b/plugins/jobs/oooold/job.go
deleted file mode 100644
index 2f80c1cc..00000000
--- a/plugins/jobs/oooold/job.go
+++ /dev/null
@@ -1,38 +0,0 @@
-package oooold
-
-import json "github.com/json-iterator/go"
-
-// Handler handles job execution.
-type Handler func(id string, j *Job) error
-
-// ErrorHandler handles job execution errors.
-type ErrorHandler func(id string, j *Job, err error)
-
-// Job carries information about single job.
-type Job struct {
- // Job contains name of job broker (usually PHP class).
- Job string `json:"job"`
-
- // Payload is string data (usually JSON) passed to Job broker.
- Payload string `json:"payload"`
-
- // Options contains set of PipelineOptions specific to job execution. Can be empty.
- Options *Options `json:"options,omitempty"`
-}
-
-// Body packs job payload into binary payload.
-func (j *Job) Body() []byte {
- return []byte(j.Payload)
-}
-
-// Context packs job context (job, id) into binary payload.
-func (j *Job) Context(id string) []byte {
- ctx, _ := json.Marshal(
- struct {
- ID string `json:"id"`
- Job string `json:"job"`
- }{ID: id, Job: j.Job},
- )
-
- return ctx
-}
diff --git a/plugins/jobs/oooold/job_options.go b/plugins/jobs/oooold/job_options.go
deleted file mode 100644
index 206bbfc4..00000000
--- a/plugins/jobs/oooold/job_options.go
+++ /dev/null
@@ -1,70 +0,0 @@
-package oooold
-
-import "time"
-
-// Options carry information about how to handle given job.
-type Options struct {
- // Pipeline manually specified pipeline.
- Pipeline string `json:"pipeline,omitempty"`
-
- // Delay defines time duration to delay execution for. Defaults to none.
- Delay int `json:"delay,omitempty"`
-
- // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
- // Minimum valuable value is 2.
- Attempts int `json:"maxAttempts,omitempty"`
-
- // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
- RetryDelay int `json:"retryDelay,omitempty"`
-
- // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
- Timeout int `json:"timeout,omitempty"`
-}
-
-// Merge merges job options.
-func (o *Options) Merge(from *Options) {
- if o.Pipeline == "" {
- o.Pipeline = from.Pipeline
- }
-
- if o.Attempts == 0 {
- o.Attempts = from.Attempts
- }
-
- if o.Timeout == 0 {
- o.Timeout = from.Timeout
- }
-
- if o.RetryDelay == 0 {
- o.RetryDelay = from.RetryDelay
- }
-
- if o.Delay == 0 {
- o.Delay = from.Delay
- }
-}
-
-// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt int) bool {
- // Attempts 1 and 0 has identical effect
- return o.Attempts > (attempt + 1)
-}
-
-// RetryDuration returns retry delay duration in a form of time.Duration.
-func (o *Options) RetryDuration() time.Duration {
- return time.Second * time.Duration(o.RetryDelay)
-}
-
-// DelayDuration returns delay duration in a form of time.Duration.
-func (o *Options) DelayDuration() time.Duration {
- return time.Second * time.Duration(o.Delay)
-}
-
-// TimeoutDuration returns timeout duration in a form of time.Duration.
-func (o *Options) TimeoutDuration() time.Duration {
- if o.Timeout == 0 {
- return 30 * time.Minute
- }
-
- return time.Second * time.Duration(o.Timeout)
-}
diff --git a/plugins/jobs/oooold/job_options_test.go b/plugins/jobs/oooold/job_options_test.go
deleted file mode 100644
index 4575d959..00000000
--- a/plugins/jobs/oooold/job_options_test.go
+++ /dev/null
@@ -1,109 +0,0 @@
-package oooold
-
-import (
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestOptions_CanRetry(t *testing.T) {
- opts := &Options{Attempts: 0}
-
- assert.False(t, opts.CanRetry(0))
- assert.False(t, opts.CanRetry(1))
-}
-
-func TestOptions_CanRetry_SameValue(t *testing.T) {
- opts := &Options{Attempts: 1}
-
- assert.False(t, opts.CanRetry(0))
- assert.False(t, opts.CanRetry(1))
-}
-
-func TestOptions_CanRetry_Value(t *testing.T) {
- opts := &Options{Attempts: 2}
-
- assert.True(t, opts.CanRetry(0))
- assert.False(t, opts.CanRetry(1))
- assert.False(t, opts.CanRetry(2))
-}
-
-func TestOptions_CanRetry_Value3(t *testing.T) {
- opts := &Options{Attempts: 3}
-
- assert.True(t, opts.CanRetry(0))
- assert.True(t, opts.CanRetry(1))
- assert.False(t, opts.CanRetry(2))
-}
-
-func TestOptions_RetryDuration(t *testing.T) {
- opts := &Options{RetryDelay: 0}
- assert.Equal(t, time.Duration(0), opts.RetryDuration())
-}
-
-func TestOptions_RetryDuration2(t *testing.T) {
- opts := &Options{RetryDelay: 1}
- assert.Equal(t, time.Second, opts.RetryDuration())
-}
-
-func TestOptions_DelayDuration(t *testing.T) {
- opts := &Options{Delay: 0}
- assert.Equal(t, time.Duration(0), opts.DelayDuration())
-}
-
-func TestOptions_DelayDuration2(t *testing.T) {
- opts := &Options{Delay: 1}
- assert.Equal(t, time.Second, opts.DelayDuration())
-}
-
-func TestOptions_TimeoutDuration(t *testing.T) {
- opts := &Options{Timeout: 0}
- assert.Equal(t, time.Minute*30, opts.TimeoutDuration())
-}
-
-func TestOptions_TimeoutDuration2(t *testing.T) {
- opts := &Options{Timeout: 1}
- assert.Equal(t, time.Second, opts.TimeoutDuration())
-}
-
-func TestOptions_Merge(t *testing.T) {
- opts := &Options{}
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- Timeout: 1,
- Attempts: 1,
- RetryDelay: 1,
- })
-
- assert.Equal(t, "pipeline", opts.Pipeline)
- assert.Equal(t, 1, opts.Attempts)
- assert.Equal(t, 2, opts.Delay)
- assert.Equal(t, 1, opts.Timeout)
- assert.Equal(t, 1, opts.RetryDelay)
-}
-
-func TestOptions_MergeKeepOriginal(t *testing.T) {
- opts := &Options{
- Pipeline: "default",
- Delay: 10,
- Timeout: 10,
- Attempts: 10,
- RetryDelay: 10,
- }
-
- opts.Merge(&Options{
- Pipeline: "pipeline",
- Delay: 2,
- Timeout: 1,
- Attempts: 1,
- RetryDelay: 1,
- })
-
- assert.Equal(t, "default", opts.Pipeline)
- assert.Equal(t, 10, opts.Attempts)
- assert.Equal(t, 10, opts.Delay)
- assert.Equal(t, 10, opts.Timeout)
- assert.Equal(t, 10, opts.RetryDelay)
-}
diff --git a/plugins/jobs/oooold/job_test.go b/plugins/jobs/oooold/job_test.go
deleted file mode 100644
index 5db924eb..00000000
--- a/plugins/jobs/oooold/job_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package oooold
-
-import (
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestJob_Body(t *testing.T) {
- j := &Job{Payload: "hello"}
-
- assert.Equal(t, []byte("hello"), j.Body())
-}
-
-func TestJob_Context(t *testing.T) {
- j := &Job{Job: "job"}
-
- assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id"))
-}
diff --git a/plugins/jobs/oooold/pipeline.go b/plugins/jobs/oooold/pipeline.go
deleted file mode 100644
index c533f23d..00000000
--- a/plugins/jobs/oooold/pipeline.go
+++ /dev/null
@@ -1,169 +0,0 @@
-package oooold
-
-import (
- "fmt"
- "time"
-)
-
-// Pipelines is list of Pipeline.
-type Pipelines []*Pipeline
-
-func initPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
- out := make(Pipelines, 0)
-
- for name, pipe := range pipes {
- if pipe.Broker() == "" {
- return nil, fmt.Errorf("found the pipeline without defined broker")
- }
-
- p := pipe.With("name", name)
- out = append(out, &p)
- }
-
- return out, nil
-}
-
-// Reverse returns pipelines in reversed order.
-func (ps Pipelines) Reverse() Pipelines {
- out := make(Pipelines, len(ps))
-
- for i, p := range ps {
- out[len(ps)-i-1] = p
- }
-
- return out
-}
-
-// Broker return pipelines associated with specific broker.
-func (ps Pipelines) Broker(broker string) Pipelines {
- out := make(Pipelines, 0)
-
- for _, p := range ps {
- if p.Broker() != broker {
- continue
- }
-
- out = append(out, p)
- }
-
- return out
-}
-
-// Names returns only pipelines with specified names.
-func (ps Pipelines) Names(only ...string) Pipelines {
- out := make(Pipelines, 0)
-
- for _, name := range only {
- for _, p := range ps {
- if p.Name() == name {
- out = append(out, p)
- }
- }
- }
-
- return out
-}
-
-// Get returns pipeline by it'svc name.
-func (ps Pipelines) Get(name string) *Pipeline {
- // possibly optimize
- for _, p := range ps {
- if p.Name() == name {
- return p
- }
- }
-
- return nil
-}
-
-// Pipeline defines pipeline options.
-type Pipeline map[string]interface{}
-
-// With pipeline value. Immutable.
-func (p Pipeline) With(name string, value interface{}) Pipeline {
- out := make(map[string]interface{})
- for k, v := range p {
- out[k] = v
- }
- out[name] = value
-
- return out
-}
-
-// Name returns pipeline name.
-func (p Pipeline) Name() string {
- return p.String("name", "")
-}
-
-// Broker associated with the pipeline.
-func (p Pipeline) Broker() string {
- return p.String("broker", "")
-}
-
-// Has checks if value presented in pipeline.
-func (p Pipeline) Has(name string) bool {
- if _, ok := p[name]; ok {
- return true
- }
-
- return false
-}
-
-// Map must return nested map value or empty config.
-func (p Pipeline) Map(name string) Pipeline {
- out := make(map[string]interface{})
-
- if value, ok := p[name]; ok {
- if m, ok := value.(map[string]interface{}); ok {
- for k, v := range m {
- out[k] = v
- }
- }
- }
-
- return out
-}
-
-// Bool must return option value as string or return default value.
-func (p Pipeline) Bool(name string, d bool) bool {
- if value, ok := p[name]; ok {
- if b, ok := value.(bool); ok {
- return b
- }
- }
-
- return d
-}
-
-// String must return option value as string or return default value.
-func (p Pipeline) String(name string, d string) string {
- if value, ok := p[name]; ok {
- if str, ok := value.(string); ok {
- return str
- }
- }
-
- return d
-}
-
-// Integer must return option value as string or return default value.
-func (p Pipeline) Integer(name string, d int) int {
- if value, ok := p[name]; ok {
- if str, ok := value.(int); ok {
- return str
- }
- }
-
- return d
-}
-
-// Duration must return option value as time.Duration (seconds) or return default value.
-func (p Pipeline) Duration(name string, d time.Duration) time.Duration {
- if value, ok := p[name]; ok {
- if str, ok := value.(int); ok {
- return time.Second * time.Duration(str)
- }
- }
-
- return d
-}
diff --git a/plugins/jobs/oooold/pipeline_test.go b/plugins/jobs/oooold/pipeline_test.go
deleted file mode 100644
index 0ace029f..00000000
--- a/plugins/jobs/oooold/pipeline_test.go
+++ /dev/null
@@ -1,89 +0,0 @@
-package oooold
-
-import (
- "github.com/stretchr/testify/assert"
- "testing"
- "time"
-)
-
-func TestPipeline_Map(t *testing.T) {
- pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
-
- assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0))
- assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0))
-}
-
-func TestPipeline_MapString(t *testing.T) {
- pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}}
-
- assert.Equal(t, "default", pipe.Map("options").String("alias", ""))
- assert.Equal(t, "", pipe.Map("other").String("alias", ""))
-}
-
-func TestPipeline_Bool(t *testing.T) {
- pipe := Pipeline{"value": true}
-
- assert.Equal(t, true, pipe.Bool("value", false))
- assert.Equal(t, true, pipe.Bool("other", true))
-}
-
-func TestPipeline_String(t *testing.T) {
- pipe := Pipeline{"value": "value"}
-
- assert.Equal(t, "value", pipe.String("value", ""))
- assert.Equal(t, "value", pipe.String("other", "value"))
-}
-
-func TestPipeline_Integer(t *testing.T) {
- pipe := Pipeline{"value": 1}
-
- assert.Equal(t, 1, pipe.Integer("value", 0))
- assert.Equal(t, 1, pipe.Integer("other", 1))
-}
-
-func TestPipeline_Duration(t *testing.T) {
- pipe := Pipeline{"value": 1}
-
- assert.Equal(t, time.Second, pipe.Duration("value", 0))
- assert.Equal(t, time.Second, pipe.Duration("other", time.Second))
-}
-
-func TestPipeline_Has(t *testing.T) {
- pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
-
- assert.Equal(t, true, pipe.Has("options"))
- assert.Equal(t, false, pipe.Has("other"))
-}
-
-func TestPipeline_FilterBroker(t *testing.T) {
- pipes := Pipelines{
- &Pipeline{"name": "first", "broker": "a"},
- &Pipeline{"name": "second", "broker": "a"},
- &Pipeline{"name": "third", "broker": "b"},
- &Pipeline{"name": "forth", "broker": "b"},
- }
-
- filtered := pipes.Names("first", "third")
- assert.True(t, len(filtered) == 2)
-
- assert.Equal(t, "a", filtered[0].Broker())
- assert.Equal(t, "b", filtered[1].Broker())
-
- filtered = pipes.Names("first", "third").Reverse()
- assert.True(t, len(filtered) == 2)
-
- assert.Equal(t, "a", filtered[1].Broker())
- assert.Equal(t, "b", filtered[0].Broker())
-
- filtered = pipes.Broker("a")
- assert.True(t, len(filtered) == 2)
-
- assert.Equal(t, "first", filtered[0].Name())
- assert.Equal(t, "second", filtered[1].Name())
-
- filtered = pipes.Broker("a").Reverse()
- assert.True(t, len(filtered) == 2)
-
- assert.Equal(t, "first", filtered[1].Name())
- assert.Equal(t, "second", filtered[0].Name())
-}
diff --git a/plugins/jobs/oooold/rpc.go b/plugins/jobs/oooold/rpc.go
deleted file mode 100644
index cc61fb7d..00000000
--- a/plugins/jobs/oooold/rpc.go
+++ /dev/null
@@ -1,150 +0,0 @@
-package oooold
-
-import (
- "fmt"
-)
-
-type rpcServer struct{ svc *Service }
-
-// WorkerList contains list of workers.
-type WorkerList struct {
- // Workers is list of workers.
- Workers []*util.State `json:"workers"`
-}
-
-// PipelineList contains list of pipeline stats.
-type PipelineList struct {
- // Pipelines is list of pipeline stats.
- Pipelines []*Stat `json:"pipelines"`
-}
-
-// Push job to the testQueue.
-func (rpc *rpcServer) Push(j *Job, id *string) (err error) {
- if rpc.svc == nil {
- return fmt.Errorf("jobs server is not running")
- }
-
- *id, err = rpc.svc.Push(j)
- return
-}
-
-// Push job to the testQueue.
-func (rpc *rpcServer) PushAsync(j *Job, ok *bool) (err error) {
- if rpc.svc == nil {
- return fmt.Errorf("jobs server is not running")
- }
-
- *ok = true
- go rpc.svc.Push(j)
-
- return
-}
-
-// Reset resets underlying RR worker pool and restarts all of it's workers.
-func (rpc *rpcServer) Reset(reset bool, w *string) error {
- if rpc.svc == nil {
- return fmt.Errorf("jobs server is not running")
- }
-
- *w = "OK"
- return rpc.svc.rr.Reset()
-}
-
-// Destroy job pipelines for a given pipeline.
-func (rpc *rpcServer) Stop(pipeline string, w *string) (err error) {
- if rpc.svc == nil {
- return fmt.Errorf("jobs server is not running")
- }
-
- pipe := rpc.svc.cfg.pipelines.Get(pipeline)
- if pipe == nil {
- return fmt.Errorf("undefined pipeline `%s`", pipeline)
- }
-
- if err := rpc.svc.Consume(pipe, nil, nil); err != nil {
- return err
- }
-
- *w = "OK"
- return nil
-}
-
-// Resume job pipelines for a given pipeline.
-func (rpc *rpcServer) Resume(pipeline string, w *string) (err error) {
- if rpc.svc == nil {
- return fmt.Errorf("jobs server is not running")
- }
-
- pipe := rpc.svc.cfg.pipelines.Get(pipeline)
- if pipe == nil {
- return fmt.Errorf("undefined pipeline `%s`", pipeline)
- }
-
- if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil {
- return err
- }
-
- *w = "OK"
- return nil
-}
-
-// Destroy job pipelines for a given pipeline.
-func (rpc *rpcServer) StopAll(stop bool, w *string) (err error) {
- if rpc.svc == nil || rpc.svc.rr == nil {
- return fmt.Errorf("jobs server is not running")
- }
-
- for _, pipe := range rpc.svc.cfg.pipelines {
- if err := rpc.svc.Consume(pipe, nil, nil); err != nil {
- return err
- }
- }
-
- *w = "OK"
- return nil
-}
-
-// Resume job pipelines for a given pipeline.
-func (rpc *rpcServer) ResumeAll(resume bool, w *string) (err error) {
- if rpc.svc == nil {
- return fmt.Errorf("jobs server is not running")
- }
-
- for _, pipe := range rpc.svc.cfg.pipelines {
- if err := rpc.svc.Consume(pipe, rpc.svc.execPool, rpc.svc.error); err != nil {
- return err
- }
- }
-
- *w = "OK"
- return nil
-}
-
-// Workers returns list of pipelines workers and their stats.
-func (rpc *rpcServer) Workers(list bool, w *WorkerList) (err error) {
- if rpc.svc == nil {
- return fmt.Errorf("jobs server is not running")
- }
-
- w.Workers, err = util.ServerState(rpc.svc.rr)
- return err
-}
-
-// Stat returns list of pipelines workers and their stats.
-func (rpc *rpcServer) Stat(list bool, l *PipelineList) (err error) {
- if rpc.svc == nil {
- return fmt.Errorf("jobs server is not running")
- }
-
- *l = PipelineList{}
- for _, p := range rpc.svc.cfg.pipelines {
- stat, err := rpc.svc.Stat(p)
- if err != nil {
- return err
- }
-
- l.Pipelines = append(l.Pipelines, stat)
- }
-
- return err
-}
diff --git a/plugins/jobs/oooold/rpc_test.go b/plugins/jobs/oooold/rpc_test.go
deleted file mode 100644
index a63b9ea2..00000000
--- a/plugins/jobs/oooold/rpc_test.go
+++ /dev/null
@@ -1,657 +0,0 @@
-package oooold
-
-import (
- "github.com/sirupsen/logrus"
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/rpc"
- "github.com/stretchr/testify/assert"
- "io/ioutil"
- "syscall"
- "testing"
-)
-
-func TestRPC_StatPipeline(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5004"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- list := &PipelineList{}
- assert.NoError(t, cl.Call("jobs.Stat", true, &list))
-
- assert.Len(t, list.Pipelines, 1)
-
- assert.Equal(t, int64(0), list.Pipelines[0].Queue)
- assert.Equal(t, true, list.Pipelines[0].Consuming)
-}
-
-func TestRPC_StatNonActivePipeline(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5004"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- list := &PipelineList{}
- assert.NoError(t, cl.Call("jobs.Stat", true, &list))
-
- assert.Len(t, list.Pipelines, 1)
-
- assert.Equal(t, int64(0), list.Pipelines[0].Queue)
- assert.Equal(t, false, list.Pipelines[0].Consuming)
-}
-
-func TestRPC_StatPipelineWithUndefinedBroker(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5004"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"undefined"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- list := &PipelineList{}
- assert.Error(t, cl.Call("jobs.Stat", true, &list))
-}
-
-func TestRPC_EnableConsuming(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5004"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- pipelineReady := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- if event == EventPipeActive {
- close(pipelineReady)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- assert.NoError(t, cl.Call("jobs.Resume", "default", nil))
-
- <-pipelineReady
-
- list := &PipelineList{}
- assert.NoError(t, cl.Call("jobs.Stat", true, &list))
-
- assert.Len(t, list.Pipelines, 1)
-
- assert.Equal(t, int64(0), list.Pipelines[0].Queue)
- assert.Equal(t, true, list.Pipelines[0].Consuming)
-}
-
-func TestRPC_EnableConsumingUndefined(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5005"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
- ok := ""
- assert.Error(t, cl.Call("jobs.Resume", "undefined", &ok))
-}
-
-func TestRPC_EnableConsumingUndefinedBroker(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5005"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"undefined"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
- ok := ""
- assert.Error(t, cl.Call("jobs.Resume", "default", &ok))
-}
-
-func TestRPC_EnableConsumingAllUndefinedBroker(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5005"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"undefined"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
- ok := ""
- assert.Error(t, cl.Call("jobs.ResumeAll", true, &ok))
-}
-
-func TestRPC_DisableConsuming(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5004"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- pipelineReady := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- if event == EventPipeStopped {
- close(pipelineReady)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- assert.NoError(t, cl.Call("jobs.Stop", "default", nil))
-
- <-pipelineReady
-
- list := &PipelineList{}
- assert.NoError(t, cl.Call("jobs.Stat", true, &list))
-
- assert.Len(t, list.Pipelines, 1)
-
- assert.Equal(t, int64(0), list.Pipelines[0].Queue)
- assert.Equal(t, false, list.Pipelines[0].Consuming)
-}
-
-func TestRPC_DisableConsumingUndefined(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5004"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- ok := ""
- assert.Error(t, cl.Call("jobs.Stop", "undefined", &ok))
-}
-
-func TestRPC_EnableAllConsuming(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5004"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- pipelineReady := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- if event == EventPipeActive {
- close(pipelineReady)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- assert.NoError(t, cl.Call("jobs.ResumeAll", true, nil))
-
- <-pipelineReady
-
- list := &PipelineList{}
- assert.NoError(t, cl.Call("jobs.Stat", true, &list))
-
- assert.Len(t, list.Pipelines, 1)
-
- assert.Equal(t, int64(0), list.Pipelines[0].Queue)
- assert.Equal(t, true, list.Pipelines[0].Consuming)
-}
-
-func TestRPC_DisableAllConsuming(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5004"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- pipelineReady := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- if event == EventPipeStopped {
- close(pipelineReady)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- assert.NoError(t, cl.Call("jobs.StopAll", true, nil))
-
- <-pipelineReady
-
- list := &PipelineList{}
- assert.NoError(t, cl.Call("jobs.Stat", true, &list))
-
- assert.Len(t, list.Pipelines, 1)
-
- assert.Equal(t, int64(0), list.Pipelines[0].Queue)
- assert.Equal(t, false, list.Pipelines[0].Consuming)
-}
-
-func TestRPC_DoJob(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5004"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobReady := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- if event == EventJobOK {
- close(jobReady)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- id := ""
- assert.NoError(t, cl.Call("jobs.Push", &Job{
- Job: "spiral.jobs.tests.local.job",
- Payload: `{"data":100}`,
- Options: &Options{},
- }, &id))
- assert.NoError(t, err)
-
- <-jobReady
-
- data, err := ioutil.ReadFile("tests/local.job")
- assert.NoError(t, err)
- defer syscall.Unlink("tests/local.job")
-
- assert.Contains(t, string(data), id)
-}
-
-func TestRPC_NoOperationOnDeadServer(t *testing.T) {
- rc := &rpcServer{nil}
-
- assert.Error(t, rc.Push(&Job{}, nil))
- assert.Error(t, rc.Reset(true, nil))
-
- assert.Error(t, rc.Stop("default", nil))
- assert.Error(t, rc.StopAll(true, nil))
-
- assert.Error(t, rc.Resume("default", nil))
- assert.Error(t, rc.ResumeAll(true, nil))
-
- assert.Error(t, rc.Workers(true, nil))
- assert.Error(t, rc.Stat(true, nil))
-}
-
-func TestRPC_Workers(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("rpc", &rpc.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "rpc":{"listen":"tcp://:5004"},
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- s2, _ := c.Get(rpc.ID)
- rs := s2.(*rpc.Service)
-
- cl, err := rs.Client()
- assert.NoError(t, err)
-
- list := &WorkerList{}
- assert.NoError(t, cl.Call("jobs.Workers", true, &list))
-
- assert.Len(t, list.Workers, 1)
-
- pid := list.Workers[0].Pid
- assert.NotEqual(t, 0, pid)
-
- // reset
- ok := ""
- assert.NoError(t, cl.Call("jobs.Reset", true, &ok))
-
- list = &WorkerList{}
- assert.NoError(t, cl.Call("jobs.Workers", true, &list))
-
- assert.Len(t, list.Workers, 1)
-
- assert.NotEqual(t, list.Workers[0].Pid, pid)
-}
diff --git a/plugins/jobs/oooold/service.go b/plugins/jobs/oooold/service.go
deleted file mode 100644
index 7cfcff31..00000000
--- a/plugins/jobs/oooold/service.go
+++ /dev/null
@@ -1,327 +0,0 @@
-package oooold
-
-import (
- "fmt"
- //"github.com/sirupsen/logrus"
- //"github.com/spiral/roadrunner"
- //"github.com/spiral/roadrunner/service"
- //"github.com/spiral/roadrunner/service/env"
- //"github.com/spiral/roadrunner/service/rpc"
- "sync"
- "sync/atomic"
- "time"
-)
-
-// ID defines public service name.
-const ID = "jobs"
-
-// Service wraps roadrunner container and manage set of parent within it.
-type Service struct {
- // Associated parent
- Brokers map[string]Broker
-
- // brokers and routing config
- cfg *Config
-
- // environment, logger and listeners
- //env env.Environment
- //log *logrus.Logger
- lsn []func(event int, ctx interface{})
-
- // server and server controller
- //rr *roadrunner.Server
- //cr roadrunner.Controller
-
- // task balancer
- execPool chan Handler
-
- // registered brokers
- serving int32
- //brokers service.Container
-
- // pipelines pipelines
- mup sync.Mutex
- pipelines map[*Pipeline]bool
-}
-
-// Attach attaches cr. Currently only one cr is supported.
-func (svc *Service) Attach(ctr roadrunner.Controller) {
- svc.cr = ctr
-}
-
-// AddListener attaches event listeners to the service and all underlying brokers.
-func (svc *Service) AddListener(l func(event int, ctx interface{})) {
- svc.lsn = append(svc.lsn, l)
-}
-
-// Init configures job service.
-func (svc *Service) Init(
- cfg service.Config,
- log *logrus.Logger,
- env env.Environment,
- rpc *rpc.Service,
-) (ok bool, err error) {
- svc.cfg = &Config{}
- if err := svc.cfg.Hydrate(cfg); err != nil {
- return false, err
- }
-
- svc.env = env
- svc.log = log
-
- if rpc != nil {
- if err := rpc.Register(ID, &rpcServer{svc}); err != nil {
- return false, err
- }
- }
-
- // limit the number of parallel threads
- if svc.cfg.Workers.Command != "" {
- svc.execPool = make(chan Handler, svc.cfg.Workers.Pool.NumWorkers)
- for i := int64(0); i < svc.cfg.Workers.Pool.NumWorkers; i++ {
- svc.execPool <- svc.exec
- }
-
- svc.rr = roadrunner.NewServer(svc.cfg.Workers)
- }
-
- svc.pipelines = make(map[*Pipeline]bool)
- for _, p := range svc.cfg.pipelines {
- svc.pipelines[p] = false
- }
-
- // run all brokers in nested container
- //svc.brokers = service.NewContainer(log)
- //for name, b := range svc.Brokers {
- // svc.brokers.Register(name, b)
- // if ep, ok := b.(EventProvider); ok {
- // ep.Listen(svc.throw)
- // }
- //}
-
- // init all broker configs
- //if err := svc.brokers.Init(svc.cfg); err != nil {
- // return false, err
- //}
-
- // register all pipelines (per broker)
- //for name, b := range svc.Brokers {
- // for _, pipe := range svc.cfg.pipelines.Broker(name) {
- // if err := b.Register(pipe); err != nil {
- // return false, err
- // }
- // }
- //}
-
- return true, nil
-}
-
-// Serve serves local rr server and creates broker association.
-func (svc *Service) Serve() error {
- if svc.rr != nil {
- if svc.env != nil {
- if err := svc.env.Copy(svc.cfg.Workers); err != nil {
- return err
- }
- }
-
- // ensure that workers aware of running within jobs
- svc.cfg.Workers.SetEnv("rr_jobs", "true")
- svc.rr.Listen(svc.throw)
-
- if svc.cr != nil {
- svc.rr.Attach(svc.cr)
- }
-
- if err := svc.rr.Start(); err != nil {
- return err
- }
- defer svc.rr.Stop()
-
- // start pipelines of all the pipelines
- for _, p := range svc.cfg.pipelines.Names(svc.cfg.Consume...) {
- // start pipeline consuming
- if err := svc.Consume(p, svc.execPool, svc.error); err != nil {
- svc.Stop()
-
- return err
- }
- }
- }
-
- atomic.StoreInt32(&svc.serving, 1)
- defer atomic.StoreInt32(&svc.serving, 0)
-
- return svc.brokers.Serve()
-}
-
-// Stop all pipelines and rr server.
-func (svc *Service) Stop() {
- if atomic.LoadInt32(&svc.serving) == 0 {
- return
- }
-
- wg := sync.WaitGroup{}
- for _, p := range svc.cfg.pipelines.Names(svc.cfg.Consume...).Reverse() {
- wg.Add(1)
-
- go func(p *Pipeline) {
- defer wg.Done()
- if err := svc.Consume(p, nil, nil); err != nil {
- svc.throw(EventPipeError, &PipelineError{Pipeline: p, Caused: err})
- }
- }(p)
- }
-
- wg.Wait()
- svc.brokers.Stop()
-}
-
-// Server returns associated rr server (if any).
-func (svc *Service) Server() *roadrunner.Server {
- return svc.rr
-}
-
-// Stat returns list of pipelines workers and their stats.
-func (svc *Service) Stat(pipe *Pipeline) (stat *Stat, err error) {
- b, ok := svc.Brokers[pipe.Broker()]
- if !ok {
- return nil, fmt.Errorf("undefined broker `%s`", pipe.Broker())
- }
-
- stat, err = b.Stat(pipe)
- if err != nil {
- return nil, err
- }
-
- stat.Pipeline = pipe.Name()
- stat.Broker = pipe.Broker()
-
- svc.mup.Lock()
- stat.Consuming = svc.pipelines[pipe]
- svc.mup.Unlock()
-
- return stat, err
-}
-
-// Consume enables or disables pipeline pipelines using given handlers.
-func (svc *Service) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error {
- svc.mup.Lock()
-
- if execPool != nil {
- if svc.pipelines[pipe] {
- svc.mup.Unlock()
- return nil
- }
-
- svc.throw(EventPipeConsume, pipe)
- svc.pipelines[pipe] = true
- } else {
- if !svc.pipelines[pipe] {
- svc.mup.Unlock()
- return nil
- }
-
- svc.throw(EventPipeStop, pipe)
- svc.pipelines[pipe] = false
- }
-
- broker, ok := svc.Brokers[pipe.Broker()]
- if !ok {
- svc.mup.Unlock()
- return fmt.Errorf("undefined broker `%s`", pipe.Broker())
- }
- svc.mup.Unlock()
-
- if err := broker.Consume(pipe, execPool, errHandler); err != nil {
- svc.mup.Lock()
- svc.pipelines[pipe] = false
- svc.mup.Unlock()
-
- svc.throw(EventPipeError, &PipelineError{Pipeline: pipe, Caused: err})
-
- return err
- }
-
- if execPool != nil {
- svc.throw(EventPipeActive, pipe)
- } else {
- svc.throw(EventPipeStopped, pipe)
- }
-
- return nil
-}
-
-// Push job to associated broker and return job id.
-func (svc *Service) Push(job *Job) (string, error) {
- pipe, pOpts, err := svc.cfg.MatchPipeline(job)
- if err != nil {
- return "", err
- }
-
- if pOpts != nil {
- job.Options.Merge(pOpts)
- }
-
- broker, ok := svc.Brokers[pipe.Broker()]
- if !ok {
- return "", fmt.Errorf("undefined broker `%s`", pipe.Broker())
- }
-
- id, err := broker.Push(pipe, job)
-
- if err != nil {
- svc.throw(EventPushError, &JobError{Job: job, Caused: err})
- } else {
- svc.throw(EventPushOK, &JobEvent{ID: id, Job: job})
- }
-
- return id, err
-}
-
-// exec executed job using local RR server. Make sure that service is started.
-func (svc *Service) exec(id string, j *Job) error {
- start := time.Now()
- svc.throw(EventJobStart, &JobEvent{ID: id, Job: j, start: start})
-
- // ignore response for now, possibly add more routing options
- _, err := svc.rr.Exec(&roadrunner.Payload{
- Body: j.Body(),
- Context: j.Context(id),
- })
-
- if err == nil {
- svc.throw(EventJobOK, &JobEvent{
- ID: id,
- Job: j,
- start: start,
- elapsed: time.Since(start),
- })
- } else {
- svc.throw(EventJobError, &JobError{
- ID: id,
- Job: j,
- Caused: err, start: start,
- elapsed: time.Since(start),
- })
- }
-
- return err
-}
-
-// register died job, can be used to move to fallback testQueue or log
-func (svc *Service) error(id string, j *Job, err error) {
- // nothing for now, possibly route to another pipeline
-}
-
-// throw handles service, server and pool events.
-func (svc *Service) throw(event int, ctx interface{}) {
- for _, l := range svc.lsn {
- l(event, ctx)
- }
-
- if event == roadrunner.EventServerFailure {
- // underlying rr server is dead, stop everything
- svc.Stop()
- }
-}
diff --git a/plugins/jobs/oooold/service_test.go b/plugins/jobs/oooold/service_test.go
deleted file mode 100644
index a8e0e56d..00000000
--- a/plugins/jobs/oooold/service_test.go
+++ /dev/null
@@ -1,458 +0,0 @@
-package oooold
-
-import (
- "bytes"
- "github.com/sirupsen/logrus"
- "github.com/spf13/viper"
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/env"
- "github.com/stretchr/testify/assert"
- "io/ioutil"
- "syscall"
- "testing"
-)
-
-func viperConfig(cfg string) service.Config {
- v := viper.New()
- v.SetConfigType("json")
-
- err := v.ReadConfig(bytes.NewBuffer([]byte(cfg)))
- if err != nil {
- panic(err)
- }
-
- return &configWrapper{v}
-}
-
-// configWrapper provides interface bridge between v configs and service.Config.
-type configWrapper struct {
- v *viper.Viper
-}
-
-// Get nested config section (sub-map), returns nil if section not found.
-func (w *configWrapper) Get(key string) service.Config {
- sub := w.v.Sub(key)
- if sub == nil {
- return nil
- }
-
- return &configWrapper{sub}
-}
-
-// Unmarshal unmarshal config data into given struct.
-func (w *configWrapper) Unmarshal(out interface{}) error {
- return w.v.Unmarshal(out)
-}
-
-func jobs(container service.Container) *Service {
- svc, _ := container.Get("jobs")
- return svc.(*Service)
-}
-
-func TestService_Init(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-}
-
-func TestService_ServeStop(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("env", &env.Service{})
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- <-ready
- c.Stop()
-}
-
-func TestService_ServeError(t *testing.T) {
- l := logrus.New()
- l.Level = logrus.FatalLevel
-
- c := service.NewContainer(l)
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/bad-consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- assert.Error(t, c.Serve())
-}
-
-func TestService_GetPipeline(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- assert.Equal(t, "ephemeral", jobs(c).cfg.pipelines.Get("default").Broker())
-}
-
-func TestService_StatPipeline(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
- pipe := svc.cfg.pipelines.Get("default")
-
- stat, err := svc.Stat(pipe)
- assert.NoError(t, err)
-
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, true, stat.Consuming)
-}
-
-func TestService_StatNonConsumingPipeline(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
- pipe := svc.cfg.pipelines.Get("default")
-
- stat, err := svc.Stat(pipe)
- assert.NoError(t, err)
-
- assert.Equal(t, int64(0), stat.Queue)
- assert.Equal(t, false, stat.Consuming)
-}
-
-func TestService_DoJob(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobReady := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- if event == EventJobOK {
- close(jobReady)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
-
- id, err := svc.Push(&Job{
- Job: "spiral.jobs.tests.local.job",
- Payload: `{"data":100}`,
- Options: &Options{},
- })
- assert.NoError(t, err)
-
- <-jobReady
-
- data, err := ioutil.ReadFile("tests/local.job")
- assert.NoError(t, err)
- defer syscall.Unlink("tests/local.job")
-
- assert.Contains(t, string(data), id)
-}
-
-func TestService_DoUndefinedJob(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
-
- _, err := svc.Push(&Job{
- Job: "spiral.jobs.tests.undefined",
- Payload: `{"data":100}`,
- Options: &Options{},
- })
- assert.Error(t, err)
-}
-
-func TestService_DoJobIntoInvalidBroker(t *testing.T) {
- l := logrus.New()
- l.Level = logrus.FatalLevel
-
- c := service.NewContainer(l)
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"undefined"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
-
- _, err := svc.Push(&Job{
- Job: "spiral.jobs.tests.local.job",
- Payload: `{"data":100}`,
- Options: &Options{},
- })
- assert.Error(t, err)
-}
-
-func TestService_DoStatInvalidBroker(t *testing.T) {
- l := logrus.New()
- l.Level = logrus.FatalLevel
-
- c := service.NewContainer(l)
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"undefined"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": []
- }
-}`)))
-
- ready := make(chan interface{})
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
-
- _, err := svc.Stat(svc.cfg.pipelines.Get("default"))
- assert.Error(t, err)
-}
-
-func TestService_DoErrorJob(t *testing.T) {
- c := service.NewContainer(logrus.New())
- c.Register("jobs", &Service{Brokers: map[string]Broker{"ephemeral": &testBroker{}}})
-
- assert.NoError(t, c.Init(viperConfig(`{
- "jobs":{
- "workers":{
- "command": "php tests/consumer.php",
- "pool.numWorkers": 1
- },
- "pipelines":{"default":{"broker":"ephemeral"}},
- "dispatch": {
- "spiral-jobs-tests-local-*.pipeline": "default"
- },
- "consume": ["default"]
- }
-}`)))
-
- ready := make(chan interface{})
- jobReady := make(chan interface{})
-
- var jobErr error
- jobs(c).AddListener(func(event int, ctx interface{}) {
- if event == EventBrokerReady {
- close(ready)
- }
-
- if event == EventJobError {
- jobErr = ctx.(error)
- close(jobReady)
- }
- })
-
- go func() { c.Serve() }()
- defer c.Stop()
- <-ready
-
- svc := jobs(c)
-
- _, err := svc.Push(&Job{
- Job: "spiral.jobs.tests.local.errorJob",
- Payload: `{"data":100}`,
- Options: &Options{},
- })
- assert.NoError(t, err)
-
- <-jobReady
- assert.Error(t, jobErr)
- assert.Contains(t, jobErr.Error(), "something is wrong")
-}
diff --git a/plugins/jobs/pipeline_test.go b/plugins/jobs/pipeline_test.go
index b80e75d0..c1f958df 100644
--- a/plugins/jobs/pipeline_test.go
+++ b/plugins/jobs/pipeline_test.go
@@ -1,9 +1,10 @@
package jobs
import (
- "github.com/stretchr/testify/assert"
"testing"
"time"
+
+ "github.com/stretchr/testify/assert"
)
func TestPipeline_Map(t *testing.T) {
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 42203871..cda2a711 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -1,10 +1,14 @@
package jobs
import (
+ "context"
+
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
)
const (
@@ -15,10 +19,12 @@ type Plugin struct {
cfg *Config
log logger.Logger
+ workersPool pool.Pool
+
consumers map[string]Consumer
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
const op = errors.Op("jobs_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -29,6 +35,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return errors.E(op, err)
}
+ p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, nil, nil)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
p.consumers = make(map[string]Consumer)
p.log = log
return nil
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 5a0bbf4e..dbe7f808 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -5,4 +5,3 @@ import "github.com/spiral/roadrunner/v2/plugins/logger"
type rpc struct {
log logger.Logger
}
-
diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto
new file mode 100644
index 00000000..46434fa8
--- /dev/null
+++ b/proto/jobs/v1beta/jobs.proto
@@ -0,0 +1,22 @@
+syntax = "proto3";
+
+package kv.v1beta;
+option go_package = "./;jobsv1beta";
+
+message Request {
+ // could be an enum in the future
+ string storage = 1;
+ repeated Item items = 2;
+}
+
+message Item {
+ string key = 1;
+ bytes value = 2;
+ // RFC 3339
+ string timeout = 3;
+}
+
+// KV response for the KV RPC methods
+message Response {
+ repeated Item items = 1;
+}
diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go
index 622967b8..75578bff 100644
--- a/proto/kv/v1beta/kv.pb.go
+++ b/proto/kv/v1beta/kv.pb.go
@@ -7,10 +7,11 @@
package kvv1beta
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go
index ad4ebbe7..a2868118 100644
--- a/proto/websockets/v1beta/websockets.pb.go
+++ b/proto/websockets/v1beta/websockets.pb.go
@@ -7,10 +7,11 @@
package websocketsv1beta
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (