summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod2
-rw-r--r--go.sum67
-rw-r--r--pkg/pool/config.go2
-rw-r--r--plugins/jobs/.rr.yaml73
-rw-r--r--plugins/jobs/brokers/amqp/config.go (renamed from plugins/jobs/oooold/broker/amqp/config.go)19
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go1
-rw-r--r--plugins/jobs/config.go51
-rw-r--r--plugins/jobs/dispatcher.go47
-rw-r--r--plugins/jobs/dispatcher_test.go53
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio2
-rw-r--r--plugins/jobs/interface.go8
-rw-r--r--plugins/jobs/job.go41
-rw-r--r--plugins/jobs/job_options.go70
-rw-r--r--plugins/jobs/job_options_test.go109
-rw-r--r--plugins/jobs/job_test.go18
-rw-r--r--plugins/jobs/oooold/broker/amqp/config_test.go27
-rw-r--r--plugins/jobs/oooold/rpc.go1
-rw-r--r--plugins/jobs/oooold/service.go34
-rw-r--r--plugins/jobs/pipeline.go172
-rw-r--r--plugins/jobs/pipeline_test.go89
-rw-r--r--plugins/jobs/plugin.go29
-rw-r--r--plugins/jobs/rpc.go8
22 files changed, 858 insertions, 65 deletions
diff --git a/go.mod b/go.mod
index 2ac9684c..7a12c9b9 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
github.com/alicebob/miniredis/v2 v2.14.5
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
+ github.com/cenkalti/backoff/v4 v4.1.1
github.com/fasthttp/websocket v1.4.3
github.com/fatih/color v1.12.0
github.com/go-ole/go-ole v1.2.5 // indirect
@@ -22,6 +23,7 @@ require (
github.com/spiral/endure v1.0.2
github.com/spiral/errors v1.0.11
github.com/spiral/goridge/v3 v3.1.4
+ github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
// ===========
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.6 // indirect
diff --git a/go.sum b/go.sum
index f218097f..d47fb696 100644
--- a/go.sum
+++ b/go.sum
@@ -15,9 +15,11 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
+github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
+github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 h1:5sXbqlSomvdjlRbWyNqkPsJ3Fg+tQZCbgeX1VGljbQY=
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
@@ -37,12 +39,18 @@ github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
+github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
+github.com/aws/aws-sdk-go v1.16.14/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
+github.com/aws/aws-sdk-go v1.27.0 h1:0xphMHGMLBrPMfxR2AmVjZKcMEESEgWF8Kru94BNByk=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
+github.com/beanstalkd/go-beanstalk v0.0.0-20180822062812-53ecdaa3bcfb/go.mod h1:Q3f6RCbUHp8RHSfBiPUZBojK76rir8Rl+KINuz2/sYs=
+github.com/beanstalkd/go-beanstalk v0.1.0 h1:IiNwYbAoVBDs5xEOmleGoX+DRD3Moz99EpATbl8672w=
+github.com/beanstalkd/go-beanstalk v0.1.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -51,9 +59,11 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
+github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
+github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@@ -69,14 +79,18 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
+github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
+github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
+github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
+github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -86,6 +100,7 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
@@ -105,12 +120,14 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
+github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
+github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-redis/redis/v8 v8.9.0 h1:FTTbB7WqlXfVNdVv0SsxA+oVi0bAwit6bMe3IUucq2o=
@@ -120,6 +137,9 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gofiber/fiber/v2 v2.10.0 h1:cYwonWaFVa7wBd/LKhgKu7mFNg2CHv5ztY6gzXtrvW8=
github.com/gofiber/fiber/v2 v2.10.0/go.mod h1:Ah3IJikrKNRepl/HuVawppS25X7FWohwfCSRn7kJG28=
+github.com/gofrs/uuid v3.1.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
+github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
+github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -166,6 +186,7 @@ github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
+github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 h1:l5lAOZEym3oK3SQ2HBHWsJUfbNBiTXJDeW2QDxw9AQ0=
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
@@ -173,6 +194,7 @@ github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
+github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
@@ -207,16 +229,19 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
+github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
@@ -229,7 +254,9 @@ github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdY
github.com/klauspost/compress v1.13.0 h1:2T7tUoQrQT+fQWdaY5rjWztFGAFwbGD04iPJg90ZiOs=
github.com/klauspost/compress v1.13.0/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/beanstalk v0.0.0-20180818045031-cae1762e4858/go.mod h1:S640fId9Ag4k2hh6Hwwj62pMSZqfMtg/kfKPeAOhET8=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -239,9 +266,12 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
+github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
+github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
+github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
@@ -249,11 +279,15 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
+github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
+github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
@@ -281,6 +315,7 @@ github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtb
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
+github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
@@ -321,6 +356,7 @@ github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og=
+github.com/prometheus/client_golang v1.5.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.10.0 h1:/o0BDeWzLWXNZ+4q5gXltUvaMpJqckTa+jTNoB+z4cg=
github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU=
@@ -336,6 +372,7 @@ github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
+github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.18.0 h1:WCVKW7aL6LEe1uryfI9dnEc2ZqNB1Fn0ok930v0iL1Y=
github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
@@ -351,21 +388,27 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c h1:2nF5+FZ4/qp7pZVL7fR6DEaSTzuDmNaFTyqp92/hwF8=
github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c/go.mod h1:TWNAOTaVzGOXq8RbEvHnhzA/A2sLZzgn0m6URjnukY8=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
+github.com/shirou/gopsutil v2.20.1+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
+github.com/shirou/gopsutil v2.20.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.21.3+incompatible h1:uenXGGa8ESCQq+dbgtl916dmg6PSAz2cXov0uORQ9v8=
github.com/shirou/gopsutil v3.21.3+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
+github.com/smartystreets/assertions v0.0.0-20180820201707-7c9eb446e3cf/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.1.1 h1:T/YLemO5Yp7KPzS+lVtu+WsHn8yoSwTfItdAd1r3cck=
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
+github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
@@ -376,11 +419,15 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
+github.com/spf13/cobra v0.0.6/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
+github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
+github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
+github.com/spf13/viper v1.6.2/go.mod h1:t3iDnF5Jlj76alVNuyFBk5oUMCvsrkbvZK0WQdfDi5k=
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spiral/endure v1.0.2 h1:gk7fh3OFP9h3JqCSEfZeBzxQr478Y3NPCcC1g/0hlbo=
@@ -388,9 +435,19 @@ github.com/spiral/endure v1.0.2/go.mod h1:/mnduq57eBKgKCwpuLgUp8Fn/c3h6JgWybG+0h
github.com/spiral/errors v1.0.10/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.11 h1:TGG+t3mNouLuRW54Ph7nHo4X3u4WhbxqEQmnIybi7Go=
github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
+github.com/spiral/goridge/v2 v2.3.0/go.mod h1:NxcCipXONzZ6smmLUl9SsnT/YJjIjDphIfbzRJICLd8=
+github.com/spiral/goridge/v2 v2.4.6 h1:9u/mrxCtOSy0lnumrpPCSOlGBX/Vprid/hFsnzWrd6k=
+github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc=
github.com/spiral/goridge/v3 v3.1.4 h1:5egVVTfaD1PO4MRgzU0yyog86pAh+JIOk7xhe7BtG40=
github.com/spiral/goridge/v3 v3.1.4/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk=
+github.com/spiral/jobs/v2 v2.2.1 h1:9sZ+GgQf2HK8Kvb7Cbgfw4DSEwaprJB0yhzQYe7qr2I=
+github.com/spiral/jobs/v2 v2.2.1/go.mod h1:Ll4PCa8H1i0ikFw34HDJmizGZSfmm5DUmzsuAij9b1o=
+github.com/spiral/roadrunner v1.8.0/go.mod h1:74R7jkvz7L9Mp6n/8liQUDd5uyA5f7uo8XHPSQmeOfg=
+github.com/spiral/roadrunner v1.9.2 h1:jGtXs3r5fevdbrkDF8BdFxEY4rIZwplnns1oWj7Vyw8=
+github.com/spiral/roadrunner v1.9.2/go.mod h1:Q1al1YGjs7ZHVkAA7+gUKM0rwk6XWG07G0UjyjjuK+0=
+github.com/streadway/amqp v0.0.0-20181205114330-a314942b2fd9/go.mod h1:1WNBiOZtZQLpVAyu0iTduoJL9hEsMloAK5XWrtW0xdY=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
+github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -398,6 +455,8 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
@@ -408,6 +467,8 @@ github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefld
github.com/tklauser/numcpus v0.2.2/go.mod h1:x3qojaO3uyYt0i56EW/VUYs7uBvdl2fkfZFu0T9wgjM=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
+github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
+github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
@@ -421,6 +482,8 @@ github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7Fw
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
+github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
+github.com/yookoala/gofast v0.4.0/go.mod h1:rfbkoKaQG1bnuTUZcmV3vAlnfpF4FTq8WbQJf2vcpg8=
github.com/yookoala/gofast v0.6.0 h1:E5x2acfUD7GkzCf8bmIMwnV10VxDy5tUCHc5LGhluwc=
github.com/yookoala/gofast v0.6.0/go.mod h1:OJU201Q6HCaE1cASckaTbMm3KB6e0cZxK0mgqfwOKvQ=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -504,10 +567,12 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@@ -556,6 +621,7 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -583,6 +649,7 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20180726210403-bfb5194568d3/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index 2a3dabe4..3a058956 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -5,7 +5,7 @@ import (
"time"
)
-// Configures the pool behavior.
+// Config .. Pool config Configures the pool behavior.
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool
diff --git a/plugins/jobs/.rr.yaml b/plugins/jobs/.rr.yaml
new file mode 100644
index 00000000..1b84515f
--- /dev/null
+++ b/plugins/jobs/.rr.yaml
@@ -0,0 +1,73 @@
+server:
+ command: "php worker.php"
+
+jobs:
+ # worker pool configuration
+ pool:
+ num_workers: 4
+
+ # rabbitmq and similar servers
+ amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+ # beanstalk configuration
+ beanstalk:
+ addr: tcp://localhost:11300
+
+ # amazon sqs configuration
+ sqs:
+ key: api-key
+ secret: api-secret
+ region: us-west-1
+ endpoint: http://localhost:9324
+
+ # job destinations and options
+ dispatch:
+ spiral-jobs-tests-amqp-*.pipeline: amqp
+ spiral-jobs-tests-local-*.pipeline: local
+ spiral-jobs-tests-beanstalk-*.pipeline: beanstalk
+ spiral-jobs-tests-sqs-*.pipeline: sqs
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ local:
+ broker: ephemeral
+
+ amqp:
+ broker: amqp
+ queue: default
+
+ beanstalk:
+ broker: beanstalk
+ tube: default
+
+ sqs:
+ broker: sqs
+ queue: default
+ declare:
+ MessageRetentionPeriod: 86400
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: ["local", "amqp", "beanstalk", "sqs"]
+
+
+# monitors rr server(s)
+limit:
+ # check worker state each second
+ interval: 1
+
+ # custom watch configuration for each service
+ services:
+ # monitor queue workers
+ jobs:
+ # maximum allowed memory consumption per worker (soft)
+ maxMemory: 100
+
+ # maximum time to live for the worker (soft)
+ TTL: 0
+
+ # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft)
+ idleTTL: 0
+
+ # max_execution_time (brutal)
+ execTTL: 60
diff --git a/plugins/jobs/oooold/broker/amqp/config.go b/plugins/jobs/brokers/amqp/config.go
index 0ed3a50e..a60cb486 100644
--- a/plugins/jobs/oooold/broker/amqp/config.go
+++ b/plugins/jobs/brokers/amqp/config.go
@@ -1,10 +1,6 @@
package amqp
-import (
- "fmt"
- "github.com/spiral/roadrunner/service"
- "time"
-)
+import "time"
// Config defines sqs broker configuration.
type Config struct {
@@ -15,19 +11,6 @@ type Config struct {
Timeout int
}
-// Hydrate config values.
-func (c *Config) Hydrate(cfg service.Config) error {
- if err := cfg.Unmarshal(c); err != nil {
- return err
- }
-
- if c.Addr == "" {
- return fmt.Errorf("AMQP address is missing")
- }
-
- return nil
-}
-
// TimeoutDuration returns number of seconds allowed to redial
func (c *Config) TimeoutDuration() time.Duration {
timeout := c.Timeout
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
new file mode 100644
index 00000000..0e8d02ac
--- /dev/null
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -0,0 +1 @@
+package amqp
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
new file mode 100644
index 00000000..5c5ad400
--- /dev/null
+++ b/plugins/jobs/config.go
@@ -0,0 +1,51 @@
+package jobs
+
+import (
+ "github.com/spiral/errors"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+)
+
+// Config defines settings for job broker, workers and job-pipeline mapping.
+type Config struct {
+ // Workers configures roadrunner server and worker busy.
+ //Workers *roadrunner.ServerConfig
+ pool poolImpl.Config
+
+ // Dispatch defines where and how to match jobs.
+ Dispatch map[string]*Options
+
+ // Pipelines defines mapping between PHP job pipeline and associated job broker.
+ Pipelines map[string]*Pipeline
+
+ // Consuming specifies names of pipelines to be consumed on service start.
+ Consume []string
+
+ // parent config for broken options.
+ pipelines Pipelines
+ route Dispatcher
+}
+
+// MatchPipeline locates the pipeline associated with the job.
+func (c *Config) MatchPipeline(job *Job) (*Pipeline, *Options, error) {
+ const op = errors.Op("config_match_pipeline")
+ opt := c.route.match(job)
+
+ pipe := ""
+ if job.Options != nil {
+ pipe = job.Options.Pipeline
+ }
+
+ if pipe == "" && opt != nil {
+ pipe = opt.Pipeline
+ }
+
+ if pipe == "" {
+ return nil, nil, errors.E(op, errors.Errorf("unable to locate pipeline for `%s`", job.Job))
+ }
+
+ if p := c.pipelines.Get(pipe); p != nil {
+ return p, opt, nil
+ }
+
+ return nil, nil, errors.E(op, errors.Errorf("undefined pipeline `%s`", pipe))
+}
diff --git a/plugins/jobs/dispatcher.go b/plugins/jobs/dispatcher.go
new file mode 100644
index 00000000..9fde8fac
--- /dev/null
+++ b/plugins/jobs/dispatcher.go
@@ -0,0 +1,47 @@
+package jobs
+
+import (
+ "strings"
+)
+
+var separators = []string{"/", "-", "\\"}
+
+// Dispatcher provides ability to automatically locate the pipeline for the specific job
+// and update job options (if none set).
+type Dispatcher map[string]*Options
+
+// pre-compile patterns
+func initDispatcher(routes map[string]*Options) Dispatcher {
+ dispatcher := make(Dispatcher)
+ for pattern, opts := range routes {
+ pattern = strings.ToLower(pattern)
+ pattern = strings.Trim(pattern, "-.*")
+
+ for _, s := range separators {
+ pattern = strings.Replace(pattern, s, ".", -1)
+ }
+
+ dispatcher[pattern] = opts
+ }
+
+ return dispatcher
+}
+
+// match clarifies target job pipeline and other job options. Can return nil.
+func (dispatcher Dispatcher) match(job *Job) (found *Options) {
+ var best = 0
+
+ jobName := strings.ToLower(job.Job)
+ for pattern, opts := range dispatcher {
+ if strings.HasPrefix(jobName, pattern) && len(pattern) > best {
+ found = opts
+ best = len(pattern)
+ }
+ }
+
+ if best == 0 {
+ return nil
+ }
+
+ return found
+}
diff --git a/plugins/jobs/dispatcher_test.go b/plugins/jobs/dispatcher_test.go
new file mode 100644
index 00000000..59e3fd4e
--- /dev/null
+++ b/plugins/jobs/dispatcher_test.go
@@ -0,0 +1,53 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_Map_All(t *testing.T) {
+ m := initDispatcher(map[string]*Options{"default": {Pipeline: "default"}})
+ assert.Equal(t, "default", m.match(&Job{Job: "default"}).Pipeline)
+}
+
+func Test_Map_Miss(t *testing.T) {
+ m := initDispatcher(map[string]*Options{"some.*": {Pipeline: "default"}})
+
+ assert.Nil(t, m.match(&Job{Job: "miss"}))
+}
+
+func Test_Map_Best(t *testing.T) {
+ m := initDispatcher(map[string]*Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestUpper(t *testing.T) {
+ m := initDispatcher(map[string]*Options{
+ "some.*": {Pipeline: "default"},
+ "some.Other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.OTHER"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "Some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestReversed(t *testing.T) {
+ m := initDispatcher(map[string]*Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other.job"}).Pipeline)
+ assert.Equal(t, "other", m.match(&Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "default", m.match(&Job{Job: "some"}).Pipeline)
+}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
index 871df916..ee923d29 100644
--- a/plugins/jobs/doc/jobs_arch.drawio
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-06-16T08:42:30.343Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="TfWMGqp6GgpLWRWY4Wkn" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vtbd5s4EP41Pqf7EB/E3Y9x7Dbdbbdusnva7psAYWgBsUJO7Pz6lUBchRMaY7vp1j5J0OiCNPPNp9FAJtpVvH1DYBq8xx6KJqribSfaYqKqwFIA+8MlOyEBql5I1iT0hKwW3IYPSAgVId2EHspaDSnGEQ3TttDFSYJc2pJBQvB9u5mPo/ZdU7hGkuDWhZEs/RR6NCikqqIodcU1CtcB7dbEsGwtBFkAPXzfEGnLiXZFMKbFVby9QhFXX6mYot/rPbXVzAhK6JAOn5Nkew1wjO4eaGK9+Yj+UdyL0hx3MNqIJYvZ0l2pA4I3iYf4KMpEm98HIUW3KXR57T0zO5MFNI5YCbBLP4yiKxxhkvfVPIhs32XyjBL8DTVqTNdGjs9qxAQQoWi7d2mgUhjDGmKLoGTHmogO+kzouISZXeLnvmE0W8iChr1AaR0ogLKuBq9VyS6ENr9Hs5IekcegJYqY0ACvcQKjZS2dtzVdt3mHcSr0+xVRuhN+AjcUt7XP9EV2n3n/qVEWvzTrFlsxeFHaiVIxVz7Bxy3A1oM3xEWPrVssnEKyRvSRhntMSlAEaXjXnkifdUTXFQ7ZFCsoGIbRhoJpdSxcTEx0a/qLNFIHVOasM1KhC2mkS0LgrtEs5Q0yCVDVmp+PMe0sGNuGtAExVvpSYopd1wDjhV0TbefFJRgIS/1IsOyASTeHwXIsqACJ529WbCTlgv2s/r69Hpf1DWR7eh/r26qjmeZIrG93dGqoMusDvYf1tWOR/uysDtl0x8o5v88hldM5pD50n1D6UXCgR2pd9FRwOpVH/mLvI4DFPA5967OpPTMNA1iqDjRr1oaOBmbTWeNjnxRIcgj/HlI34MT+drV89/bP5ajkjgCjd6uP3GempcGxyL0b0ut95K72kPvRInrrnP4KWuRu/fDsbgx02EP99SCL2i+OgUHLnLV1fxyDHnqsO8ighkSFE9WM2HTnacvO5r8bnnaZx2w1YTLRLlmtkm7Z71xpSiG/oNykvE5v1DFOoxcwCtein8tUiUi72kMuJmy9WLThkCFRmKD61uxqLf7mE3RKwe/YYWq5vKVk49INQWUDphCn24nJ0q4sIF3Js9YeIZ8WlTav7E6bzTLvscjHJmGyrhZS3X8FdxGG3pPtPqRcU1nVrir3LbLjsHyTCV0YXRYGWeQmmwvzLIpFzDFr5Ud58o1vZnxPwwkVHgpUUX4N4zDinnONojvER310H/QN/u1NbeUfMWpDXnzG2R8ttZudUHr2R7Nnf+xmMUZzPvMncL4Sej+8/63CFBVresK5Fox4d0+2uqQUxSmtfTCPVfcv8hhLuuEobE+3MQtHUuup5vVXGCO8oc+Z1C+yKihHSqUOJSsAjsVWAEjmOVs4PziaV8+WPVUGhn+Hpk8Ps6kibUE3zHPRND8Qv/qKnd9GPQr7tovc3qdbjm3ohjKO96hm9yg8G3gUPlqeE6iSoglim2XSjN46iuZ7c1ubJfGJTVybd/kxDj2v8DuUhQ/QyYfiWhWPVNi4xnxiLPhYzNUywZGSQRKcoHFsoUlpCVu2xeyUWQkgp39SFhtMVJ7dxz+zLYyuX2imbIu+h77Hs4V8/mzEaRcsLoAxZxQRHbD4I2VKEzGZsQi56n3OPdZ8Yi3+L1br8SBwWhc6a2bvhWXigTkwEgD2gaHAs1LxjH3PmIoH8hk4RuyOXNc1Eexj5ReYlVfPH4qc95nrC3Nee6DzNt5QO6HzgrM+RyuV03DeN4hPfs49iieZfIJjrr0A5apYhxnbdRFPcDp5E54+uQuh8PZp0e/VyGcO31f7zxye6ZjGsRwd9O3SfY7efa9lvBfq5MPdapMFr+potzjfKcxX2e0bdhtR+2d6n1EZqn39aNqXXyDao96fI0q1uvjXZQsYpwxSVfnILakebygPMa6ql6EVOa/Hvq/5bedrAr0Q1XVCeR3AV817Ne3BLKg8qbTqO+igaMWsVKTgFw6mlPGmbPZ2+rNCR9P7ejKf5c7ObwmztFioH275PArAILK8QwVucowEMOUd4u2av6U+hfeZPt1k+b3GgEqZSaxePtZkqFi2DBXLPhZUNJkpr1dM8OnDzR/LGwk2pYJSgl2UZU+TogPdb+ucRj8UiNuzVZ0kPWbqHVe1dXtqDKTL789EsmL9zn4RetT/+6At/wM=</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-06-21T13:57:33.772Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="eDaFKmf6xAQVYDDexY3m" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvrc5s4EP9rPONmJh7e4I9x7CTttVc3uZu2900GYWgBcUL4kb/+JBBPYcdNsJ32mkxra/VA+/jtrlZkoF6Hm1sMYu8DcmAwUCRnM1CnA0WRVVMz6ScjbXOSJUtSTlli3+HDKsKD/wg5sRiW+g5MGgMJQgHx4ybRRlEEbdKgAYzRujnMRUHzqTFYQoHwYINApH72HeIVu5MKPljPHfSXHhG6QlCM54TEAw5a10jqbKBeY4RI/i3cXMOASbAQTT7vZkdvuTcMI3LIhC9RtLmTUQhXjyQybz/BfyT7UtHyZVYgSDnTfLdkW0gBozRyIFtFGqiTtecT+BADm/WuqeYpzSNhQFsy/er6QXCNAoSzuaoDoOXalJ4QjL7DWo9hW3Dh0h6RD87aCmICNzUS5+sWUiYI3tIhvFe1uIy5oY3NwoLWldqUYpBXV1ihHcBNZVkuXomSfuHS/BHJCnKEDjUu3kSYeGiJIhDMKuqkKelqzHuEYi7fb5CQLUcKSAlqSp+KEG+/sPkjvWh+rfdNN3zxvLXlrZ06SFCKbbiPTc4nAXgJyZ6BfEEmhL0qxTAAxF81EdilHT51jny659IUNE1vmoJutjSc75RPq+NFWKllVPq4tVIuHGGlK4zBtjYsZgMSwaBKnp9vY+pZbGzjk5qJ0dbXwqbo98rAWGNbt7aT2qV8oFlqpzLLljGpxmFm2ZepFD618vP3c7qSdEn/zf9+uKOL5h90Q9vI7jcI6NBytK4gYCkL1TB6CgJmS8SaIgYBWesIAuqxYsD4rPiso7PE6o/hUzoaPrVDw4Z0GoAqbespc4pTAfS3M3+5sRgn8ubqeGSNDV2XTUWjB51x03SUdlJ5ZNMRc/gPgNge8+xv57P3b/+c9erOoUwdutnlzseGqYK+3Hk7p1e73LnS4c6PltKb50So3HDn5mvz5/qBEO0doS/SqPXT+Vy5oc5Ku2dTaO/nuhcpVBdc4UAxArr/SdzQs/Fvyuouk5Cy50cD9Yr2SvGG/p8JTcrpl4SplPVptT7qwMglCPwln2dT2ULc7HagjTDlF/ExzGRw4EewejT9tuSf2QYXBeEdWlCxXD0QnNokxbAYQAWyaE+itLhN83Cb8izeA+iSvNNine1t011mM6bZ2tiPliUj5fPnYBsg4Dw57mPMJJWU48p2F5MtwLKI4tsguMoVMs1UNuHqmeZMTBAd5QZZ9Y0FMxbTUEQ4QmWFt29A6AcMOXcwWEG26t446Orst7O2lf3wVWv0/Kef+GjI7fKE1BEfjY742C5j9AY+4xcAX2F6rx5/cz+GOU9PgGtKHe/2yVFXhMAwJhUGs1x1N5PHYOmeWWFzu7VdLASxnmpff/khRCl5zqZ+O6us1xJqqYc6K1k+lreSZUE9Z0vnD87mlVOVT6UD07/e66cv06kkhKB7ilw4yg7Ew29o8abXo7Br2dDuvN5aWLqm79XB4eiR9fZReHzgUfholU1ZEQSNIQ2WUT17awmaxeamNAvHx4O4Omn7x9B3nBx3MPEfwSJbikmV36nQdfXJQJ+ytSjUEu4jBYVEKIL96EIRyhKWqIvxKasSslj+iWluMFBYeR/9yrrQ2rhQDFEXXbe+x9OFeP6s5WmXNC8AIfMoPDug+UdMhcZzMn3qM9G7zPeYk4E5/b9orQNB8mkhdNbK3uuuvRc19acLQVbfqcCziu9yO+Yd+95GPPWGkD6RibuC/i4//BPW4eXzJx/nvVd93XC1DoRrobEzw1U68V1ZIZ8aXG8h2+6EYYgVklyMQiZAD2bML/2ERlbIipiLbAgrkax8wPE9yucNez5XuK7Sfa5wjIWhHwvaUlck7oJ2++WV/t6aEw9w8zTxhlVGm5/hJIpO+via3nqU/nleWrTGh0pfO5r0xbeEdoj318hEjbb9a6IG9FMmoop4rBZEj1LCkorr8p1nSazd0d8b9tjJEgPHh1UfF17L4MvhnZJ2QOKVSCq0+h4sYDCnWsrL7NMFIoT6TVHtzRJnaR119HVUN4tYzh4Jkjhn1PU3bB+5wUA8W8HcbjIb8UDMJoSbJXsbfQTWiTZKk+xZfZiKJDdNxVRFUzEt0VQKWv+mooqe8m5OCZ8/3v8xuxfMphBQjJENk+Rpp7gA9vdl5kY/5ha3I1SdpASmqy2ompo10g90lz1UGx8N11mr1o32Pbz6eptu5ub47lLUwOnuuzousvLSQiC2qrJCq/7Qau240SgvvajLSdIwS5KGeS705kx3Y52iEi5z9skxGyVI8YEAMrwoSjYszxhe0IQjGbGOPP+AGKMG3/1cQR2Loyx7uqiKUIyFi3d5DjUsLgOf4muHK8GZW677DPHqqsvJ7AB/h4vYUxIftxxC+QJnzR2UrwQ33MEz0ifarP5QJz+KVH/zpM7+Aw==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
new file mode 100644
index 00000000..060326c8
--- /dev/null
+++ b/plugins/jobs/interface.go
@@ -0,0 +1,8 @@
+package jobs
+
+
+// todo naming
+type Consumer interface {
+ Push()
+ Stat()
+}
diff --git a/plugins/jobs/job.go b/plugins/jobs/job.go
new file mode 100644
index 00000000..8458b25b
--- /dev/null
+++ b/plugins/jobs/job.go
@@ -0,0 +1,41 @@
+package jobs
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+//// Handler handles job execution.
+//type Handler func(id string, j *Job) error
+//
+//// ErrorHandler handles job execution errors.
+//type ErrorHandler func(id string, j *Job, err error)
+
+// Job carries information about single job.
+type Job struct {
+ // Job contains name of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Body packs job payload into binary payload.
+func (j *Job) Body() []byte {
+ return utils.AsBytes(j.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+func (j *Job) Context(id string) []byte {
+ ctx, _ := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ }{ID: id, Job: j.Job},
+ )
+
+ return ctx
+}
diff --git a/plugins/jobs/job_options.go b/plugins/jobs/job_options.go
new file mode 100644
index 00000000..d4c6f0d2
--- /dev/null
+++ b/plugins/jobs/job_options.go
@@ -0,0 +1,70 @@
+package jobs
+
+import "time"
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int `json:"delay,omitempty"`
+
+ // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
+ // Minimum valuable value is 2.
+ Attempts int `json:"maxAttempts,omitempty"`
+
+ // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
+ RetryDelay int `json:"retryDelay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int `json:"timeout,omitempty"`
+}
+
+// Merge merges job options.
+func (o *Options) Merge(from *Options) {
+ if o.Pipeline == "" {
+ o.Pipeline = from.Pipeline
+ }
+
+ if o.Attempts == 0 {
+ o.Attempts = from.Attempts
+ }
+
+ if o.Timeout == 0 {
+ o.Timeout = from.Timeout
+ }
+
+ if o.RetryDelay == 0 {
+ o.RetryDelay = from.RetryDelay
+ }
+
+ if o.Delay == 0 {
+ o.Delay = from.Delay
+ }
+}
+
+// CanRetry must return true if broker is allowed to re-run the job.
+func (o *Options) CanRetry(attempt int) bool {
+ // Attempts 1 and 0 has identical effect
+ return o.Attempts > (attempt + 1)
+}
+
+// RetryDuration returns retry delay duration in a form of time.Duration.
+func (o *Options) RetryDuration() time.Duration {
+ return time.Second * time.Duration(o.RetryDelay)
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
diff --git a/plugins/jobs/job_options_test.go b/plugins/jobs/job_options_test.go
new file mode 100644
index 00000000..8caaa935
--- /dev/null
+++ b/plugins/jobs/job_options_test.go
@@ -0,0 +1,109 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestOptions_CanRetry(t *testing.T) {
+ opts := &Options{Attempts: 0}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_SameValue(t *testing.T) {
+ opts := &Options{Attempts: 1}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_Value(t *testing.T) {
+ opts := &Options{Attempts: 2}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_CanRetry_Value3(t *testing.T) {
+ opts := &Options{Attempts: 3}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.True(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_RetryDuration(t *testing.T) {
+ opts := &Options{RetryDelay: 0}
+ assert.Equal(t, time.Duration(0), opts.RetryDuration())
+}
+
+func TestOptions_RetryDuration2(t *testing.T) {
+ opts := &Options{RetryDelay: 1}
+ assert.Equal(t, time.Second, opts.RetryDuration())
+}
+
+func TestOptions_DelayDuration(t *testing.T) {
+ opts := &Options{Delay: 0}
+ assert.Equal(t, time.Duration(0), opts.DelayDuration())
+}
+
+func TestOptions_DelayDuration2(t *testing.T) {
+ opts := &Options{Delay: 1}
+ assert.Equal(t, time.Second, opts.DelayDuration())
+}
+
+func TestOptions_TimeoutDuration(t *testing.T) {
+ opts := &Options{Timeout: 0}
+ assert.Equal(t, time.Minute*30, opts.TimeoutDuration())
+}
+
+func TestOptions_TimeoutDuration2(t *testing.T) {
+ opts := &Options{Timeout: 1}
+ assert.Equal(t, time.Second, opts.TimeoutDuration())
+}
+
+func TestOptions_Merge(t *testing.T) {
+ opts := &Options{}
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "pipeline", opts.Pipeline)
+ assert.Equal(t, 1, opts.Attempts)
+ assert.Equal(t, 2, opts.Delay)
+ assert.Equal(t, 1, opts.Timeout)
+ assert.Equal(t, 1, opts.RetryDelay)
+}
+
+func TestOptions_MergeKeepOriginal(t *testing.T) {
+ opts := &Options{
+ Pipeline: "default",
+ Delay: 10,
+ Timeout: 10,
+ Attempts: 10,
+ RetryDelay: 10,
+ }
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "default", opts.Pipeline)
+ assert.Equal(t, 10, opts.Attempts)
+ assert.Equal(t, 10, opts.Delay)
+ assert.Equal(t, 10, opts.Timeout)
+ assert.Equal(t, 10, opts.RetryDelay)
+}
diff --git a/plugins/jobs/job_test.go b/plugins/jobs/job_test.go
new file mode 100644
index 00000000..e1938eca
--- /dev/null
+++ b/plugins/jobs/job_test.go
@@ -0,0 +1,18 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestJob_Body(t *testing.T) {
+ j := &Job{Payload: "hello"}
+
+ assert.Equal(t, []byte("hello"), j.Body())
+}
+
+func TestJob_Context(t *testing.T) {
+ j := &Job{Job: "job"}
+
+ assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id"))
+}
diff --git a/plugins/jobs/oooold/broker/amqp/config_test.go b/plugins/jobs/oooold/broker/amqp/config_test.go
deleted file mode 100644
index 1abbb55d..00000000
--- a/plugins/jobs/oooold/broker/amqp/config_test.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package amqp
-
-import (
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/service"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-type mockCfg struct{ cfg string }
-
-func (cfg *mockCfg) Get(name string) service.Config { return nil }
-func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
-
-func Test_Config_Hydrate_Error(t *testing.T) {
- cfg := &mockCfg{`{"dead`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
-
-func Test_Config_Hydrate_Error2(t *testing.T) {
- cfg := &mockCfg{`{"addr":""}`}
- c := &Config{}
-
- assert.Error(t, c.Hydrate(cfg))
-}
diff --git a/plugins/jobs/oooold/rpc.go b/plugins/jobs/oooold/rpc.go
index 42730a68..cc61fb7d 100644
--- a/plugins/jobs/oooold/rpc.go
+++ b/plugins/jobs/oooold/rpc.go
@@ -2,7 +2,6 @@ package oooold
import (
"fmt"
- "github.com/spiral/roadrunner/util"
)
type rpcServer struct{ svc *Service }
diff --git a/plugins/jobs/oooold/service.go b/plugins/jobs/oooold/service.go
index 4244ed1a..7cfcff31 100644
--- a/plugins/jobs/oooold/service.go
+++ b/plugins/jobs/oooold/service.go
@@ -91,27 +91,27 @@ func (svc *Service) Init(
}
// run all brokers in nested container
- svc.brokers = service.NewContainer(log)
- for name, b := range svc.Brokers {
- svc.brokers.Register(name, b)
- if ep, ok := b.(EventProvider); ok {
- ep.Listen(svc.throw)
- }
- }
+ //svc.brokers = service.NewContainer(log)
+ //for name, b := range svc.Brokers {
+ // svc.brokers.Register(name, b)
+ // if ep, ok := b.(EventProvider); ok {
+ // ep.Listen(svc.throw)
+ // }
+ //}
// init all broker configs
- if err := svc.brokers.Init(svc.cfg); err != nil {
- return false, err
- }
+ //if err := svc.brokers.Init(svc.cfg); err != nil {
+ // return false, err
+ //}
// register all pipelines (per broker)
- for name, b := range svc.Brokers {
- for _, pipe := range svc.cfg.pipelines.Broker(name) {
- if err := b.Register(pipe); err != nil {
- return false, err
- }
- }
- }
+ //for name, b := range svc.Brokers {
+ // for _, pipe := range svc.cfg.pipelines.Broker(name) {
+ // if err := b.Register(pipe); err != nil {
+ // return false, err
+ // }
+ // }
+ //}
return true, nil
}
diff --git a/plugins/jobs/pipeline.go b/plugins/jobs/pipeline.go
new file mode 100644
index 00000000..bfd2e18c
--- /dev/null
+++ b/plugins/jobs/pipeline.go
@@ -0,0 +1,172 @@
+package jobs
+
+import (
+ "time"
+
+ "github.com/spiral/errors"
+)
+
+// Pipelines is list of Pipeline.
+
+type Pipelines []*Pipeline
+
+func initPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
+ const op = errors.Op("pipeline_init")
+ out := make(Pipelines, 0)
+
+ for name, pipe := range pipes {
+ if pipe.Broker() == "" {
+ return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker"))
+ }
+
+ p := pipe.With("name", name)
+ out = append(out, &p)
+ }
+
+ return out, nil
+}
+
+// Reverse returns pipelines in reversed order.
+func (ps Pipelines) Reverse() Pipelines {
+ out := make(Pipelines, len(ps))
+
+ for i, p := range ps {
+ out[len(ps)-i-1] = p
+ }
+
+ return out
+}
+
+// Broker return pipelines associated with specific broker.
+func (ps Pipelines) Broker(broker string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, p := range ps {
+ if p.Broker() != broker {
+ continue
+ }
+
+ out = append(out, p)
+ }
+
+ return out
+}
+
+// Names returns only pipelines with specified names.
+func (ps Pipelines) Names(only ...string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, name := range only {
+ for _, p := range ps {
+ if p.Name() == name {
+ out = append(out, p)
+ }
+ }
+ }
+
+ return out
+}
+
+// Get returns pipeline by it'svc name.
+func (ps Pipelines) Get(name string) *Pipeline {
+ // possibly optimize
+ for _, p := range ps {
+ if p.Name() == name {
+ return p
+ }
+ }
+
+ return nil
+}
+
+// Pipeline defines pipeline options.
+type Pipeline map[string]interface{}
+
+// With pipeline value. Immutable.
+func (p Pipeline) With(name string, value interface{}) Pipeline {
+ out := make(map[string]interface{})
+ for k, v := range p {
+ out[k] = v
+ }
+ out[name] = value
+
+ return out
+}
+
+// Name returns pipeline name.
+func (p Pipeline) Name() string {
+ return p.String("name", "")
+}
+
+// Broker associated with the pipeline.
+func (p Pipeline) Broker() string {
+ return p.String("broker", "")
+}
+
+// Has checks if value presented in pipeline.
+func (p Pipeline) Has(name string) bool {
+ if _, ok := p[name]; ok {
+ return true
+ }
+
+ return false
+}
+
+// Map must return nested map value or empty config.
+func (p Pipeline) Map(name string) Pipeline {
+ out := make(map[string]interface{})
+
+ if value, ok := p[name]; ok {
+ if m, ok := value.(map[string]interface{}); ok {
+ for k, v := range m {
+ out[k] = v
+ }
+ }
+ }
+
+ return out
+}
+
+// Bool must return option value as string or return default value.
+func (p Pipeline) Bool(name string, d bool) bool {
+ if value, ok := p[name]; ok {
+ if b, ok := value.(bool); ok {
+ return b
+ }
+ }
+
+ return d
+}
+
+// String must return option value as string or return default value.
+func (p Pipeline) String(name string, d string) string {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(string); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Integer must return option value as string or return default value.
+func (p Pipeline) Integer(name string, d int) int {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Duration must return option value as time.Duration (seconds) or return default value.
+func (p Pipeline) Duration(name string, d time.Duration) time.Duration {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return time.Second * time.Duration(str)
+ }
+ }
+
+ return d
+}
diff --git a/plugins/jobs/pipeline_test.go b/plugins/jobs/pipeline_test.go
new file mode 100644
index 00000000..b80e75d0
--- /dev/null
+++ b/plugins/jobs/pipeline_test.go
@@ -0,0 +1,89 @@
+package jobs
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestPipeline_Map(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0))
+ assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0))
+}
+
+func TestPipeline_MapString(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}}
+
+ assert.Equal(t, "default", pipe.Map("options").String("alias", ""))
+ assert.Equal(t, "", pipe.Map("other").String("alias", ""))
+}
+
+func TestPipeline_Bool(t *testing.T) {
+ pipe := Pipeline{"value": true}
+
+ assert.Equal(t, true, pipe.Bool("value", false))
+ assert.Equal(t, true, pipe.Bool("other", true))
+}
+
+func TestPipeline_String(t *testing.T) {
+ pipe := Pipeline{"value": "value"}
+
+ assert.Equal(t, "value", pipe.String("value", ""))
+ assert.Equal(t, "value", pipe.String("other", "value"))
+}
+
+func TestPipeline_Integer(t *testing.T) {
+ pipe := Pipeline{"value": 1}
+
+ assert.Equal(t, 1, pipe.Integer("value", 0))
+ assert.Equal(t, 1, pipe.Integer("other", 1))
+}
+
+func TestPipeline_Duration(t *testing.T) {
+ pipe := Pipeline{"value": 1}
+
+ assert.Equal(t, time.Second, pipe.Duration("value", 0))
+ assert.Equal(t, time.Second, pipe.Duration("other", time.Second))
+}
+
+func TestPipeline_Has(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, true, pipe.Has("options"))
+ assert.Equal(t, false, pipe.Has("other"))
+}
+
+func TestPipeline_FilterBroker(t *testing.T) {
+ pipes := Pipelines{
+ &Pipeline{"name": "first", "broker": "a"},
+ &Pipeline{"name": "second", "broker": "a"},
+ &Pipeline{"name": "third", "broker": "b"},
+ &Pipeline{"name": "forth", "broker": "b"},
+ }
+
+ filtered := pipes.Names("first", "third")
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "a", filtered[0].Broker())
+ assert.Equal(t, "b", filtered[1].Broker())
+
+ filtered = pipes.Names("first", "third").Reverse()
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "a", filtered[1].Broker())
+ assert.Equal(t, "b", filtered[0].Broker())
+
+ filtered = pipes.Broker("a")
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "first", filtered[0].Name())
+ assert.Equal(t, "second", filtered[1].Name())
+
+ filtered = pipes.Broker("a").Reverse()
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "first", filtered[1].Name())
+ assert.Equal(t, "second", filtered[0].Name())
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index e708d0ca..42203871 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -1,6 +1,7 @@
package jobs
import (
+ endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -11,11 +12,25 @@ const (
)
type Plugin struct {
+ cfg *Config
+ log logger.Logger
+
+ consumers map[string]Consumer
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("jobs_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ p.consumers = make(map[string]Consumer)
+ p.log = log
return nil
}
@@ -29,8 +44,22 @@ func (p *Plugin) Stop() error {
return nil
}
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectMQBrokers,
+ }
+}
+
+func (p *Plugin) CollectMQBrokers(name endure.Named, c Consumer) {
+ p.consumers[name.Name()] = c
+}
+
func (p *Plugin) Available() {}
func (p *Plugin) Name() string {
return PluginName
}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{log: p.log}
+}
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
new file mode 100644
index 00000000..5a0bbf4e
--- /dev/null
+++ b/plugins/jobs/rpc.go
@@ -0,0 +1,8 @@
+package jobs
+
+import "github.com/spiral/roadrunner/v2/plugins/logger"
+
+type rpc struct {
+ log logger.Logger
+}
+