summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.dockerignore2
-rwxr-xr-xCODE_OF_CONDUCT.md2
-rw-r--r--common/doc.go9
-rw-r--r--common/kv/interface.go (renamed from plugins/kv/interface.go)0
-rw-r--r--common/pubsub/interface.go (renamed from pkg/pubsub/interface.go)0
-rw-r--r--common/pubsub/psmessage.go (renamed from pkg/pubsub/psmessage.go)0
-rw-r--r--go.mod2
-rw-r--r--go.sum67
-rwxr-xr-xpkg/events/general.go2
-rw-r--r--pkg/events/interface.go4
-rw-r--r--pkg/events/jobs_events.go84
-rw-r--r--pkg/events/pool_events.go2
-rw-r--r--pkg/events/worker_events.go2
-rw-r--r--pkg/pool/config.go2
-rw-r--r--plugins/broadcast/interface.go2
-rw-r--r--plugins/broadcast/plugin.go2
-rw-r--r--plugins/broadcast/rpc.go2
-rw-r--r--plugins/jobs/.rr.yaml73
-rw-r--r--plugins/jobs/brokers/amqp/config.go22
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go1
-rw-r--r--plugins/jobs/brokers/ephemeral/config.go1
-rw-r--r--plugins/jobs/brokers/ephemeral/plugin.go8
-rw-r--r--plugins/jobs/config.go64
-rw-r--r--plugins/jobs/dispatcher/dispatcher.go49
-rw-r--r--plugins/jobs/dispatcher/dispatcher_test.go55
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio1
-rw-r--r--plugins/jobs/interface.go14
-rw-r--r--plugins/jobs/pipeline/pipeline.go172
-rw-r--r--plugins/jobs/pipeline/pipeline_test.go90
-rw-r--r--plugins/jobs/plugin.go113
-rw-r--r--plugins/jobs/rpc.go20
-rw-r--r--plugins/jobs/structs/job.go35
-rw-r--r--plugins/jobs/structs/job_options.go70
-rw-r--r--plugins/jobs/structs/job_options_test.go110
-rw-r--r--plugins/jobs/structs/job_test.go19
-rw-r--r--plugins/kv/drivers/boltdb/driver.go2
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go9
-rw-r--r--plugins/kv/drivers/memcached/driver.go2
-rw-r--r--plugins/kv/drivers/memcached/plugin.go9
-rw-r--r--plugins/kv/plugin.go12
-rw-r--r--plugins/kv/rpc.go3
-rw-r--r--plugins/memory/kv.go2
-rw-r--r--plugins/memory/plugin.go5
-rw-r--r--plugins/memory/pubsub.go2
-rw-r--r--plugins/redis/channel.go2
-rw-r--r--plugins/redis/kv.go2
-rw-r--r--plugins/redis/plugin.go4
-rw-r--r--plugins/redis/pubsub.go2
-rw-r--r--plugins/server/plugin.go27
-rw-r--r--plugins/websockets/executor/executor.go2
-rw-r--r--plugins/websockets/plugin.go2
-rw-r--r--plugins/websockets/pool/workers_pool.go2
-rw-r--r--proto/jobs/v1beta/jobs.proto22
-rw-r--r--proto/kv/v1beta/kv.pb.go5
-rw-r--r--proto/websockets/v1beta/websockets.pb.go5
-rw-r--r--tests/composer.json2
-rw-r--r--tests/docker-compose-jobs.yml22
-rw-r--r--tests/plugins/broadcast/plugins/plugin1.go2
-rw-r--r--tests/plugins/broadcast/plugins/plugin2.go2
-rw-r--r--tests/plugins/broadcast/plugins/plugin3.go2
-rw-r--r--tests/plugins/broadcast/plugins/plugin4.go2
-rw-r--r--tests/plugins/broadcast/plugins/plugin5.go2
-rw-r--r--tests/plugins/broadcast/plugins/plugin6.go2
-rw-r--r--tests/plugins/headers/configs/.rr-cors-headers.yaml4
-rw-r--r--tests/psr-worker-bench.php2
-rw-r--r--tests/psr-worker.php2
-rw-r--r--tests/worker-cors.php15
67 files changed, 1213 insertions, 68 deletions
diff --git a/.dockerignore b/.dockerignore
index bfa82a3d..b817b3c8 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -7,4 +7,4 @@
/tests
/bin
composer.json
-vendor_php \ No newline at end of file
+vendor_php
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
index ae0b283a..49aeb3c8 100755
--- a/CODE_OF_CONDUCT.md
+++ b/CODE_OF_CONDUCT.md
@@ -43,4 +43,4 @@ Project maintainers who do not follow or enforce the Code of Conduct in good fai
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version]
[homepage]: http://contributor-covenant.org
-[version]: http://contributor-covenant.org/version/1/4/
+[version]: https://www.contributor-covenant.org/version/2/0/code_of_conduct/
diff --git a/common/doc.go b/common/doc.go
new file mode 100644
index 00000000..adc03351
--- /dev/null
+++ b/common/doc.go
@@ -0,0 +1,9 @@
+/*
+Package common used to collect common interfaces/structures which might be implemented (or imported) by a different plugins.
+For example, 'pubsub' interface might be implemented by memory, redis, websockets and many other plugins.
+
+Folders:
+- kv - contains KV interfaces and structures
+- pubsub - contains pub-sub interfaces and structures
+*/
+package common
diff --git a/plugins/kv/interface.go b/common/kv/interface.go
index ffdbbe62..ffdbbe62 100644
--- a/plugins/kv/interface.go
+++ b/common/kv/interface.go
diff --git a/pkg/pubsub/interface.go b/common/pubsub/interface.go
index 06252d70..06252d70 100644
--- a/pkg/pubsub/interface.go
+++ b/common/pubsub/interface.go
diff --git a/pkg/pubsub/psmessage.go b/common/pubsub/psmessage.go
index e33d9284..e33d9284 100644
--- a/pkg/pubsub/psmessage.go
+++ b/common/pubsub/psmessage.go
diff --git a/go.mod b/go.mod
index 2ac9684c..7a12c9b9 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
github.com/alicebob/miniredis/v2 v2.14.5
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
+ github.com/cenkalti/backoff/v4 v4.1.1
github.com/fasthttp/websocket v1.4.3
github.com/fatih/color v1.12.0
github.com/go-ole/go-ole v1.2.5 // indirect
@@ -22,6 +23,7 @@ require (
github.com/spiral/endure v1.0.2
github.com/spiral/errors v1.0.11
github.com/spiral/goridge/v3 v3.1.4
+ github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
// ===========
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.6 // indirect
diff --git a/go.sum b/go.sum
index f218097f..d47fb696 100644
--- a/go.sum
+++ b/go.sum
@@ -15,9 +15,11 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
+github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
+github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 h1:5sXbqlSomvdjlRbWyNqkPsJ3Fg+tQZCbgeX1VGljbQY=
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
@@ -37,12 +39,18 @@ github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
+github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
+github.com/aws/aws-sdk-go v1.16.14/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
+github.com/aws/aws-sdk-go v1.27.0 h1:0xphMHGMLBrPMfxR2AmVjZKcMEESEgWF8Kru94BNByk=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
+github.com/beanstalkd/go-beanstalk v0.0.0-20180822062812-53ecdaa3bcfb/go.mod h1:Q3f6RCbUHp8RHSfBiPUZBojK76rir8Rl+KINuz2/sYs=
+github.com/beanstalkd/go-beanstalk v0.1.0 h1:IiNwYbAoVBDs5xEOmleGoX+DRD3Moz99EpATbl8672w=
+github.com/beanstalkd/go-beanstalk v0.1.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -51,9 +59,11 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
+github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
+github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@@ -69,14 +79,18 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
+github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
+github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
+github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
+github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -86,6 +100,7 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
@@ -105,12 +120,14 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
+github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
+github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-redis/redis/v8 v8.9.0 h1:FTTbB7WqlXfVNdVv0SsxA+oVi0bAwit6bMe3IUucq2o=
@@ -120,6 +137,9 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gofiber/fiber/v2 v2.10.0 h1:cYwonWaFVa7wBd/LKhgKu7mFNg2CHv5ztY6gzXtrvW8=
github.com/gofiber/fiber/v2 v2.10.0/go.mod h1:Ah3IJikrKNRepl/HuVawppS25X7FWohwfCSRn7kJG28=
+github.com/gofrs/uuid v3.1.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
+github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
+github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -166,6 +186,7 @@ github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
+github.com/gopherjs/gopherjs v0.0.0-20180825215210-0210a2f0f73c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 h1:l5lAOZEym3oK3SQ2HBHWsJUfbNBiTXJDeW2QDxw9AQ0=
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
@@ -173,6 +194,7 @@ github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
+github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
@@ -207,16 +229,19 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
+github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
@@ -229,7 +254,9 @@ github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdY
github.com/klauspost/compress v1.13.0 h1:2T7tUoQrQT+fQWdaY5rjWztFGAFwbGD04iPJg90ZiOs=
github.com/klauspost/compress v1.13.0/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/beanstalk v0.0.0-20180818045031-cae1762e4858/go.mod h1:S640fId9Ag4k2hh6Hwwj62pMSZqfMtg/kfKPeAOhET8=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -239,9 +266,12 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
+github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
+github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
+github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
@@ -249,11 +279,15 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
+github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
+github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
@@ -281,6 +315,7 @@ github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtb
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
+github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
@@ -321,6 +356,7 @@ github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og=
+github.com/prometheus/client_golang v1.5.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.10.0 h1:/o0BDeWzLWXNZ+4q5gXltUvaMpJqckTa+jTNoB+z4cg=
github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU=
@@ -336,6 +372,7 @@ github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
+github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.18.0 h1:WCVKW7aL6LEe1uryfI9dnEc2ZqNB1Fn0ok930v0iL1Y=
github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
@@ -351,21 +388,27 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c h1:2nF5+FZ4/qp7pZVL7fR6DEaSTzuDmNaFTyqp92/hwF8=
github.com/savsgio/gotils v0.0.0-20200608150037-a5f6f5aef16c/go.mod h1:TWNAOTaVzGOXq8RbEvHnhzA/A2sLZzgn0m6URjnukY8=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
+github.com/shirou/gopsutil v2.20.1+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
+github.com/shirou/gopsutil v2.20.7+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.21.3+incompatible h1:uenXGGa8ESCQq+dbgtl916dmg6PSAz2cXov0uORQ9v8=
github.com/shirou/gopsutil v3.21.3+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
+github.com/smartystreets/assertions v0.0.0-20180820201707-7c9eb446e3cf/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.1.1 h1:T/YLemO5Yp7KPzS+lVtu+WsHn8yoSwTfItdAd1r3cck=
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
+github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
@@ -376,11 +419,15 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
+github.com/spf13/cobra v0.0.6/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
+github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
+github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
+github.com/spf13/viper v1.6.2/go.mod h1:t3iDnF5Jlj76alVNuyFBk5oUMCvsrkbvZK0WQdfDi5k=
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spiral/endure v1.0.2 h1:gk7fh3OFP9h3JqCSEfZeBzxQr478Y3NPCcC1g/0hlbo=
@@ -388,9 +435,19 @@ github.com/spiral/endure v1.0.2/go.mod h1:/mnduq57eBKgKCwpuLgUp8Fn/c3h6JgWybG+0h
github.com/spiral/errors v1.0.10/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.11 h1:TGG+t3mNouLuRW54Ph7nHo4X3u4WhbxqEQmnIybi7Go=
github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
+github.com/spiral/goridge/v2 v2.3.0/go.mod h1:NxcCipXONzZ6smmLUl9SsnT/YJjIjDphIfbzRJICLd8=
+github.com/spiral/goridge/v2 v2.4.6 h1:9u/mrxCtOSy0lnumrpPCSOlGBX/Vprid/hFsnzWrd6k=
+github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc=
github.com/spiral/goridge/v3 v3.1.4 h1:5egVVTfaD1PO4MRgzU0yyog86pAh+JIOk7xhe7BtG40=
github.com/spiral/goridge/v3 v3.1.4/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk=
+github.com/spiral/jobs/v2 v2.2.1 h1:9sZ+GgQf2HK8Kvb7Cbgfw4DSEwaprJB0yhzQYe7qr2I=
+github.com/spiral/jobs/v2 v2.2.1/go.mod h1:Ll4PCa8H1i0ikFw34HDJmizGZSfmm5DUmzsuAij9b1o=
+github.com/spiral/roadrunner v1.8.0/go.mod h1:74R7jkvz7L9Mp6n/8liQUDd5uyA5f7uo8XHPSQmeOfg=
+github.com/spiral/roadrunner v1.9.2 h1:jGtXs3r5fevdbrkDF8BdFxEY4rIZwplnns1oWj7Vyw8=
+github.com/spiral/roadrunner v1.9.2/go.mod h1:Q1al1YGjs7ZHVkAA7+gUKM0rwk6XWG07G0UjyjjuK+0=
+github.com/streadway/amqp v0.0.0-20181205114330-a314942b2fd9/go.mod h1:1WNBiOZtZQLpVAyu0iTduoJL9hEsMloAK5XWrtW0xdY=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
+github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -398,6 +455,8 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
@@ -408,6 +467,8 @@ github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefld
github.com/tklauser/numcpus v0.2.2/go.mod h1:x3qojaO3uyYt0i56EW/VUYs7uBvdl2fkfZFu0T9wgjM=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
+github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
+github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
@@ -421,6 +482,8 @@ github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7Fw
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
+github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
+github.com/yookoala/gofast v0.4.0/go.mod h1:rfbkoKaQG1bnuTUZcmV3vAlnfpF4FTq8WbQJf2vcpg8=
github.com/yookoala/gofast v0.6.0 h1:E5x2acfUD7GkzCf8bmIMwnV10VxDy5tUCHc5LGhluwc=
github.com/yookoala/gofast v0.6.0/go.mod h1:OJU201Q6HCaE1cASckaTbMm3KB6e0cZxK0mgqfwOKvQ=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -504,10 +567,12 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@@ -556,6 +621,7 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -583,6 +649,7 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20180726210403-bfb5194568d3/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
diff --git a/pkg/events/general.go b/pkg/events/general.go
index a09a8759..5cf13e10 100755
--- a/pkg/events/general.go
+++ b/pkg/events/general.go
@@ -4,6 +4,8 @@ import (
"sync"
)
+const UnknownEventType string = "Unknown event type"
+
// HandlerImpl helps to broadcast events to multiple listeners.
type HandlerImpl struct {
listeners []Listener
diff --git a/pkg/events/interface.go b/pkg/events/interface.go
index ac6c15a4..7d57e4d0 100644
--- a/pkg/events/interface.go
+++ b/pkg/events/interface.go
@@ -2,7 +2,7 @@ package events
// Handler interface
type Handler interface {
- // Return number of active listeners
+ // NumListeners return number of active listeners
NumListeners() int
// AddListener adds lister to the publisher
AddListener(listener Listener)
@@ -10,5 +10,5 @@ type Handler interface {
Push(e interface{})
}
-// Event listener listens for the events produced by worker, worker pool or other service.
+// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service.
type Listener func(event interface{})
diff --git a/pkg/events/jobs_events.go b/pkg/events/jobs_events.go
new file mode 100644
index 00000000..ed07c7da
--- /dev/null
+++ b/pkg/events/jobs_events.go
@@ -0,0 +1,84 @@
+package events
+
+import (
+ "time"
+)
+
+const (
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK = iota + 12000
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventPipeConsume when pipeline pipelines has been requested.
+ EventPipeConsume
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStop when pipeline has begun stopping.
+ EventPipeStop
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventBrokerReady thrown when broken is ready to accept/serve tasks.
+ EventBrokerReady
+)
+
+type J int64
+
+func (ev J) String() string {
+ switch ev {
+ case EventPushOK:
+ return "EventPushOK"
+ case EventPushError:
+ return "EventPushError"
+ case EventJobStart:
+ return "EventJobStart"
+ case EventJobOK:
+ return "EventJobOK"
+ case EventJobError:
+ return "EventJobError"
+ case EventPipeConsume:
+ return "EventPipeConsume"
+ case EventPipeActive:
+ return "EventPipeActive"
+ case EventPipeStop:
+ return "EventPipeStop"
+ case EventPipeStopped:
+ return "EventPipeStopped"
+ case EventPipeError:
+ return "EventPipeError"
+ case EventBrokerReady:
+ return "EventBrokerReady"
+ }
+ return UnknownEventType
+}
+
+// JobEvent represent job event.
+type JobEvent struct {
+ Event J
+ // String is job id.
+ ID string
+
+ // Job is failed job.
+ Job interface{} // this is *jobs.Job, but interface used to avoid package import
+
+ // event timings
+ Start time.Time
+ Elapsed time.Duration
+}
diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go
index e7b451e0..4d4cae5d 100644
--- a/pkg/events/pool_events.go
+++ b/pkg/events/pool_events.go
@@ -57,7 +57,7 @@ func (ev P) String() string {
case EventPoolRestart:
return "EventPoolRestart"
}
- return "Unknown event type"
+ return UnknownEventType
}
// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go
index 11bd6ab7..39c38e57 100644
--- a/pkg/events/worker_events.go
+++ b/pkg/events/worker_events.go
@@ -20,7 +20,7 @@ func (ev W) String() string {
case EventWorkerStderr:
return "EventWorkerStderr"
}
- return "Unknown event type"
+ return UnknownEventType
}
// WorkerEvent wraps worker events.
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index 2a3dabe4..3a058956 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -5,7 +5,7 @@ import (
"time"
)
-// Configures the pool behavior.
+// Config .. Pool config Configures the pool behavior.
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
index 46709d71..eda3572f 100644
--- a/plugins/broadcast/interface.go
+++ b/plugins/broadcast/interface.go
@@ -1,6 +1,6 @@
package broadcast
-import "github.com/spiral/roadrunner/v2/pkg/pubsub"
+import "github.com/spiral/roadrunner/v2/common/pubsub"
type Broadcaster interface {
GetDriver(key string) (pubsub.SubReader, error)
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 6ddef806..889dc2fa 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -7,7 +7,7 @@ import (
"github.com/google/uuid"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
index 2ee211f8..475076a0 100644
--- a/plugins/broadcast/rpc.go
+++ b/plugins/broadcast/rpc.go
@@ -2,7 +2,7 @@ package broadcast
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
)
diff --git a/plugins/jobs/.rr.yaml b/plugins/jobs/.rr.yaml
new file mode 100644
index 00000000..1b84515f
--- /dev/null
+++ b/plugins/jobs/.rr.yaml
@@ -0,0 +1,73 @@
+server:
+ command: "php worker.php"
+
+jobs:
+ # worker pool configuration
+ pool:
+ num_workers: 4
+
+ # rabbitmq and similar servers
+ amqp:
+ addr: amqp://guest:guest@localhost:5672/
+
+ # beanstalk configuration
+ beanstalk:
+ addr: tcp://localhost:11300
+
+ # amazon sqs configuration
+ sqs:
+ key: api-key
+ secret: api-secret
+ region: us-west-1
+ endpoint: http://localhost:9324
+
+ # job destinations and options
+ dispatch:
+ spiral-jobs-tests-amqp-*.pipeline: amqp
+ spiral-jobs-tests-local-*.pipeline: local
+ spiral-jobs-tests-beanstalk-*.pipeline: beanstalk
+ spiral-jobs-tests-sqs-*.pipeline: sqs
+
+ # list of broker pipelines associated with endpoints
+ pipelines:
+ local:
+ broker: ephemeral
+
+ amqp:
+ broker: amqp
+ queue: default
+
+ beanstalk:
+ broker: beanstalk
+ tube: default
+
+ sqs:
+ broker: sqs
+ queue: default
+ declare:
+ MessageRetentionPeriod: 86400
+
+ # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
+ consume: ["local", "amqp", "beanstalk", "sqs"]
+
+
+# monitors rr server(s)
+limit:
+ # check worker state each second
+ interval: 1
+
+ # custom watch configuration for each service
+ services:
+ # monitor queue workers
+ jobs:
+ # maximum allowed memory consumption per worker (soft)
+ maxMemory: 100
+
+ # maximum time to live for the worker (soft)
+ TTL: 0
+
+ # maximum allowed amount of time worker can spend in idle before being removed (for weak db connections, soft)
+ idleTTL: 0
+
+ # max_execution_time (brutal)
+ execTTL: 60
diff --git a/plugins/jobs/brokers/amqp/config.go b/plugins/jobs/brokers/amqp/config.go
new file mode 100644
index 00000000..a60cb486
--- /dev/null
+++ b/plugins/jobs/brokers/amqp/config.go
@@ -0,0 +1,22 @@
+package amqp
+
+import "time"
+
+// Config defines sqs broker configuration.
+type Config struct {
+ // Addr of AMQP server (example: amqp://guest:guest@localhost:5672/).
+ Addr string
+
+ // Timeout to allocate the connection. Default 10 seconds.
+ Timeout int
+}
+
+// TimeoutDuration returns number of seconds allowed to redial
+func (c *Config) TimeoutDuration() time.Duration {
+ timeout := c.Timeout
+ if timeout == 0 {
+ timeout = 10
+ }
+
+ return time.Duration(timeout) * time.Second
+}
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
new file mode 100644
index 00000000..0e8d02ac
--- /dev/null
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -0,0 +1 @@
+package amqp
diff --git a/plugins/jobs/brokers/ephemeral/config.go b/plugins/jobs/brokers/ephemeral/config.go
new file mode 100644
index 00000000..847b63ea
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/config.go
@@ -0,0 +1 @@
+package ephemeral
diff --git a/plugins/jobs/brokers/ephemeral/plugin.go b/plugins/jobs/brokers/ephemeral/plugin.go
new file mode 100644
index 00000000..3028e79a
--- /dev/null
+++ b/plugins/jobs/brokers/ephemeral/plugin.go
@@ -0,0 +1,8 @@
+package ephemeral
+
+type Plugin struct {
+}
+
+func (p *Plugin) Init() error {
+ return nil
+}
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
new file mode 100644
index 00000000..1e49b959
--- /dev/null
+++ b/plugins/jobs/config.go
@@ -0,0 +1,64 @@
+package jobs
+
+import (
+ "github.com/spiral/errors"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/dispatcher"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+)
+
+// Config defines settings for job broker, workers and job-pipeline mapping.
+type Config struct {
+ // Workers configures roadrunner server and worker busy.
+ // Workers *roadrunner.ServerConfig
+ poolCfg poolImpl.Config
+
+ // Dispatch defines where and how to match jobs.
+ Dispatch map[string]*structs.Options
+
+ // Pipelines defines mapping between PHP job pipeline and associated job broker.
+ Pipelines map[string]*pipeline.Pipeline
+
+ // Consuming specifies names of pipelines to be consumed on service start.
+ Consume []string
+
+ // parent config for broken options.
+ pipelines pipeline.Pipelines
+ route dispatcher.Dispatcher
+}
+
+func (c *Config) InitDefaults() error {
+ const op = errors.Op("config_init_defaults")
+ var err error
+ c.pipelines, err = pipeline.InitPipelines(c.Pipelines)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+// MatchPipeline locates the pipeline associated with the job.
+func (c *Config) MatchPipeline(job *structs.Job) (*pipeline.Pipeline, *structs.Options, error) {
+ const op = errors.Op("config_match_pipeline")
+ opt := c.route.Match(job)
+
+ pipe := ""
+ if job.Options != nil {
+ pipe = job.Options.Pipeline
+ }
+
+ if pipe == "" && opt != nil {
+ pipe = opt.Pipeline
+ }
+
+ if pipe == "" {
+ return nil, nil, errors.E(op, errors.Errorf("unable to locate pipeline for `%s`", job.Job))
+ }
+
+ if p := c.pipelines.Get(pipe); p != nil {
+ return p, opt, nil
+ }
+
+ return nil, nil, errors.E(op, errors.Errorf("undefined pipeline `%s`", pipe))
+}
diff --git a/plugins/jobs/dispatcher/dispatcher.go b/plugins/jobs/dispatcher/dispatcher.go
new file mode 100644
index 00000000..e73e7b74
--- /dev/null
+++ b/plugins/jobs/dispatcher/dispatcher.go
@@ -0,0 +1,49 @@
+package dispatcher
+
+import (
+ "strings"
+
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+)
+
+var separators = []string{"/", "-", "\\"}
+
+// Dispatcher provides ability to automatically locate the pipeline for the specific job
+// and update job options (if none set).
+type Dispatcher map[string]*structs.Options
+
+// pre-compile patterns
+func initDispatcher(routes map[string]*structs.Options) Dispatcher {
+ dispatcher := make(Dispatcher)
+ for pattern, opts := range routes {
+ pattern = strings.ToLower(pattern)
+ pattern = strings.Trim(pattern, "-.*")
+
+ for _, s := range separators {
+ pattern = strings.ReplaceAll(pattern, s, ".")
+ }
+
+ dispatcher[pattern] = opts
+ }
+
+ return dispatcher
+}
+
+// Match clarifies target job pipeline and other job options. Can return nil.
+func (dispatcher Dispatcher) Match(job *structs.Job) (found *structs.Options) {
+ var best = 0
+
+ jobName := strings.ToLower(job.Job)
+ for pattern, opts := range dispatcher {
+ if strings.HasPrefix(jobName, pattern) && len(pattern) > best {
+ found = opts
+ best = len(pattern)
+ }
+ }
+
+ if best == 0 {
+ return nil
+ }
+
+ return found
+}
diff --git a/plugins/jobs/dispatcher/dispatcher_test.go b/plugins/jobs/dispatcher/dispatcher_test.go
new file mode 100644
index 00000000..e584bda8
--- /dev/null
+++ b/plugins/jobs/dispatcher/dispatcher_test.go
@@ -0,0 +1,55 @@
+package dispatcher
+
+import (
+ "testing"
+
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_Map_All(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{"default": {Pipeline: "default"}})
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "default"}).Pipeline)
+}
+
+func Test_Map_Miss(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{"some.*": {Pipeline: "default"}})
+
+ assert.Nil(t, m.Match(&structs.Job{Job: "miss"}))
+}
+
+func Test_Map_Best(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestUpper(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{
+ "some.*": {Pipeline: "default"},
+ "some.Other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.OTHER"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "Some.other.job"}).Pipeline)
+}
+
+func Test_Map_BestReversed(t *testing.T) {
+ m := initDispatcher(map[string]*structs.Options{
+ "some.*": {Pipeline: "default"},
+ "some.other.*": {Pipeline: "other"},
+ })
+
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other.job"}).Pipeline)
+ assert.Equal(t, "other", m.Match(&structs.Job{Job: "some.other"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some.any"}).Pipeline)
+ assert.Equal(t, "default", m.Match(&structs.Job{Job: "some"}).Pipeline)
+}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
new file mode 100644
index 00000000..0639f448
--- /dev/null
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-06-22T08:56:50.739Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.77 Electron/13.1.2 Safari/537.36" etag="Kdvb2D1nWMjMRedS8I3V" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7Vvpc9o4FP9rmKGZCePb5mMIOdpttzTZnbb7TdgCu7UtVxZX/vqVbPmUIDQxkLQlB+hJlvXe+71DT6anX0brGwwS/wPyYNjTFG/d08c9TdMUS6NvjLLJKaqmmjlljgOP0yrCffAAOVHh1EXgwbQxkCAUkiBpEl0Ux9AlDRrAGK2aw2YobN41AXMoEO5dEIrUz4FH/GJ19FX13MJg7hOhKwLFeE5IfeChVY2kX/X0S4wQyT9F60sYMgEWosmvu97SW64Nw5jsc8GXOF7fqiiCywcS2zef4H+Ke64Z+TRLEC4403y1ZFNIAaNF7EE2i9LTRys/IPA+AS7rXVHFU5pPopC2VPpxFoThJQoRzq7VPQCdmUvpKcHoO6z1WK4DpzPaI/LBWVtCTOC6RuJ83UDKBMEbOoT36g6XMQfa0C4QtKrUphWD/LrCCu0ADpV5OXklSvqBS/NnJCvIEXoUXLyJMPHRHMUgvKqoo6akqzHvEUq4fL9BQjbcUsCCoKb0qQjx5gu7fmAWza/1vvGaT563Nry1VQcpWmAX7mKT80kAnkOyYyCfkAlhp0oxDAEJlk0LlGmHXzpBAV1zCQXDMJtQMO2WhvOV8svq9iLM1AKVOWzNlAtHmOkCY7CpDUvYgFQAVMnz0zGmnwRj64DUIEZbXwtM0c8VwFhjU0fbUXGp7glL41iwbIFJt/aDZVdQKXxq5efvJnQm5Zz+Tf69v6WT5m90QZvY7TYImNDxDFkQcLSpblkdBQG7JWJDE4OAakiCgH6oGDA8qX3WrbO01Z+zT+Vg9mnsGzaU4xio1kZPmVMcy0D/OPPng8U6kjfXhwNnaJmmamuGqtvDJnS0dlJ5YOiIOfwHQFyfefa3k6v3b/++6tSdQ5U6dFvmzoeWrYOu3Hk7p9dl7lyTuPODpfT2KS1Ubbhz+6X5c3NPE+3cQp+lUefV+Vy1oc5KuydTaOf7umcp1BRcYU+zQrr+UdLQs/Vjweouo4iyF8Q9/YL2Ksma/s+EpuT0c8JUyvqMWh91YOQchMGcX+dS2ULc7PagizDlF/ExDDI4DGJY3Zp+mvP3bIHTgvAOTalYLu4JXrhkgWExgApk2r6I0pI2zcdtypN4D+GM5J0O62wvm64yu2KczY2DeF4yUt5/AjYhAt6j4z4mTFJpOa5sy5hsGSyLKIELwotcIeNMZSOunnHOxAjRUbMwq76xYMZiGooJt1BV4+1rEAUhs5xbGC4hm3VnHJyZ7Eda28pefNYaPX91Ex8ttV2eUCTx0ZLEx3YZozPjs34B4yug9+LtbxIkMOfpEeMaU8e7eXTUBSEwSkhlg1muup3JQ7B0x1DYXG5tFVNBrMda1z9BBNGCPGVRf5xV1usItdR9nZWqHspbqaqgnpOl83tn89qxyqfKnulf5/XT5+lUEULQHbVcOMg2xP1vaPqm063wzHGhKz3emjqmYe7Uwf7Wo5rtrfBwz63wwSqbqiYIGkMaLON69tYSNIvNTWkWjo8HcX3U9o9R4Hm53cE0eADTbComVX6mQuc1Rz1zzOaippZyHykoJEYx7EYXmlCWcERdDI9ZlVDF8k9Cc4Oexsr76FfWhdG2C80SdSE79T2cLsT9Zy1PO6d5AYiYR+HZAc0/Eio0npOZ44CJfsZ8jz3q2ePfRWsSC1KPa0Inrey97Np7UVN/vBDkdJ0KPKn4rrZj3qHPbcRdbwTpHZm4K9Pf5odfYR1ePX3ycdpz1Zdtrs6e5lpo7MTmqhz5rKyQT81cbyBb7ojZECskzTCKmAB9mDE/D1IaWSErYk6zIaxEsgwAt+9Bfl2/433FbKbJ9xWeNbXMQ5m2IovEMtNuP7zS3VNz4gZuskj9fpXR5ns4hVonvX1Nbx1K/zQPLTrDfaVvHEz64lNCW8T7a2SiVhv/hqgB85iJqCZuqwXRowVhScVl+cyzItbu6M81u+1ojoEXwKqPC68F+HK4VNIeSP3SkgqtvgdTGE6olvIy+3iKCKF+U1R7s8RZoqNufZLqZhHL2S1BmuSMzoI1W0cOGIivljDHTYYRHyTsgmg9Zw+jD8AqNQaLNLtXF1BR1CZUbF2Eiu2IUClo3UNFFz3l7YQSPn+8++vqToBNIaAEIxem6eNOcQrc7/PMjX7MEbclVB2lBGbqLVO1DWdg7ukuO6g2Plgzb6U718b36OLrzWI9sYe356IGjnfeJTnIyksLodiqygqt+kOrteVEozz0oi4nXURZktTPc6E3Jzobk4pKOMzZJcdslCDFewJI/6wo2bA8o39GE450wDry/ANijBp8d3MEdSiOsuzprCpCMRbO3uU5VL84DHyFfHEsNpT1SpZ+x/cUTaD19Gv+SxVDvWhIYx7baqQoXMJ0C3dbfDzO4mXdmYtnijLvv8UrS3z39rOK0gXvOqwon9Vu+Oku8lqpo/5N6gWtrfyjBQSprCTPfUnHOXIYdH7Ot2uR9dwn+/LCczaBXYDfaoPfkO3pFAn4uyiWbddTQ1Q/BiyksVzCB3QPEb4AsbWfZTrkASdtVl+6zMtK1ddX9av/AQ==</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/interface.go b/plugins/jobs/interface.go
new file mode 100644
index 00000000..b4862038
--- /dev/null
+++ b/plugins/jobs/interface.go
@@ -0,0 +1,14 @@
+package jobs
+
+import (
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+)
+
+// Consumer todo naming
+type Consumer interface {
+ Push(*pipeline.Pipeline, *structs.Job) (string, error)
+ Stat()
+ Consume(*pipeline.Pipeline)
+ Register(*pipeline.Pipeline)
+}
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
new file mode 100644
index 00000000..f27f6ede
--- /dev/null
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -0,0 +1,172 @@
+package pipeline
+
+import (
+ "time"
+
+ "github.com/spiral/errors"
+)
+
+// Pipelines is list of Pipeline.
+
+type Pipelines []*Pipeline
+
+func InitPipelines(pipes map[string]*Pipeline) (Pipelines, error) {
+ const op = errors.Op("pipeline_init")
+ out := make(Pipelines, 0)
+
+ for name, pipe := range pipes {
+ if pipe.Broker() == "" {
+ return nil, errors.E(op, errors.Errorf("found the pipeline without defined broker"))
+ }
+
+ p := pipe.With("name", name)
+ out = append(out, &p)
+ }
+
+ return out, nil
+}
+
+// Reverse returns pipelines in reversed order.
+func (ps Pipelines) Reverse() Pipelines {
+ out := make(Pipelines, len(ps))
+
+ for i, p := range ps {
+ out[len(ps)-i-1] = p
+ }
+
+ return out
+}
+
+// Broker return pipelines associated with specific broker.
+func (ps Pipelines) Broker(broker string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, p := range ps {
+ if p.Broker() != broker {
+ continue
+ }
+
+ out = append(out, p)
+ }
+
+ return out
+}
+
+// Names returns only pipelines with specified names.
+func (ps Pipelines) Names(only ...string) Pipelines {
+ out := make(Pipelines, 0)
+
+ for _, name := range only {
+ for _, p := range ps {
+ if p.Name() == name {
+ out = append(out, p)
+ }
+ }
+ }
+
+ return out
+}
+
+// Get returns pipeline by it'svc name.
+func (ps Pipelines) Get(name string) *Pipeline {
+ // possibly optimize
+ for _, p := range ps {
+ if p.Name() == name {
+ return p
+ }
+ }
+
+ return nil
+}
+
+// Pipeline defines pipeline options.
+type Pipeline map[string]interface{}
+
+// With pipeline value. Immutable.
+func (p Pipeline) With(name string, value interface{}) Pipeline {
+ out := make(map[string]interface{})
+ for k, v := range p {
+ out[k] = v
+ }
+ out[name] = value
+
+ return out
+}
+
+// Name returns pipeline name.
+func (p Pipeline) Name() string {
+ return p.String("name", "")
+}
+
+// Broker associated with the pipeline.
+func (p Pipeline) Broker() string {
+ return p.String("broker", "")
+}
+
+// Has checks if value presented in pipeline.
+func (p Pipeline) Has(name string) bool {
+ if _, ok := p[name]; ok {
+ return true
+ }
+
+ return false
+}
+
+// Map must return nested map value or empty config.
+func (p Pipeline) Map(name string) Pipeline {
+ out := make(map[string]interface{})
+
+ if value, ok := p[name]; ok {
+ if m, ok := value.(map[string]interface{}); ok {
+ for k, v := range m {
+ out[k] = v
+ }
+ }
+ }
+
+ return out
+}
+
+// Bool must return option value as string or return default value.
+func (p Pipeline) Bool(name string, d bool) bool {
+ if value, ok := p[name]; ok {
+ if b, ok := value.(bool); ok {
+ return b
+ }
+ }
+
+ return d
+}
+
+// String must return option value as string or return default value.
+func (p Pipeline) String(name string, d string) string {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(string); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Integer must return option value as string or return default value.
+func (p Pipeline) Integer(name string, d int) int {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Duration must return option value as time.Duration (seconds) or return default value.
+func (p Pipeline) Duration(name string, d time.Duration) time.Duration {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(int); ok {
+ return time.Second * time.Duration(str)
+ }
+ }
+
+ return d
+}
diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go
new file mode 100644
index 00000000..f03dcbb8
--- /dev/null
+++ b/plugins/jobs/pipeline/pipeline_test.go
@@ -0,0 +1,90 @@
+package pipeline
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestPipeline_Map(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, 10, pipe.Map("options").Integer("ttl", 0))
+ assert.Equal(t, 0, pipe.Map("other").Integer("ttl", 0))
+}
+
+func TestPipeline_MapString(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"alias": "default"}}
+
+ assert.Equal(t, "default", pipe.Map("options").String("alias", ""))
+ assert.Equal(t, "", pipe.Map("other").String("alias", ""))
+}
+
+func TestPipeline_Bool(t *testing.T) {
+ pipe := Pipeline{"value": true}
+
+ assert.Equal(t, true, pipe.Bool("value", false))
+ assert.Equal(t, true, pipe.Bool("other", true))
+}
+
+func TestPipeline_String(t *testing.T) {
+ pipe := Pipeline{"value": "value"}
+
+ assert.Equal(t, "value", pipe.String("value", ""))
+ assert.Equal(t, "value", pipe.String("other", "value"))
+}
+
+func TestPipeline_Integer(t *testing.T) {
+ pipe := Pipeline{"value": 1}
+
+ assert.Equal(t, 1, pipe.Integer("value", 0))
+ assert.Equal(t, 1, pipe.Integer("other", 1))
+}
+
+func TestPipeline_Duration(t *testing.T) {
+ pipe := Pipeline{"value": 1}
+
+ assert.Equal(t, time.Second, pipe.Duration("value", 0))
+ assert.Equal(t, time.Second, pipe.Duration("other", time.Second))
+}
+
+func TestPipeline_Has(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, true, pipe.Has("options"))
+ assert.Equal(t, false, pipe.Has("other"))
+}
+
+func TestPipeline_FilterBroker(t *testing.T) {
+ pipes := Pipelines{
+ &Pipeline{"name": "first", "broker": "a"},
+ &Pipeline{"name": "second", "broker": "a"},
+ &Pipeline{"name": "third", "broker": "b"},
+ &Pipeline{"name": "forth", "broker": "b"},
+ }
+
+ filtered := pipes.Names("first", "third")
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "a", filtered[0].Broker())
+ assert.Equal(t, "b", filtered[1].Broker())
+
+ filtered = pipes.Names("first", "third").Reverse()
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "a", filtered[1].Broker())
+ assert.Equal(t, "b", filtered[0].Broker())
+
+ filtered = pipes.Broker("a")
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "first", filtered[0].Name())
+ assert.Equal(t, "second", filtered[1].Name())
+
+ filtered = pipes.Broker("a").Reverse()
+ assert.True(t, len(filtered) == 2)
+
+ assert.Equal(t, "first", filtered[1].Name())
+ assert.Equal(t, "second", filtered[0].Name())
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
new file mode 100644
index 00000000..072f872a
--- /dev/null
+++ b/plugins/jobs/plugin.go
@@ -0,0 +1,113 @@
+package jobs
+
+import (
+ "context"
+ "fmt"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+)
+
+const (
+ // RrJobs env variable
+ RrJobs string = "rr_jobs"
+ PluginName string = "jobs"
+)
+
+type Plugin struct {
+ cfg *Config
+ log logger.Logger
+
+ workersPool pool.Pool
+
+ consumers map[string]Consumer
+ events events.Handler
+}
+
+func testListener(data interface{}) {
+ fmt.Println(data)
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+ const op = errors.Op("jobs_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.workersPool, err = server.NewWorkerPool(context.Background(), p.cfg.poolCfg, map[string]string{RrJobs: "true"}, testListener)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.events = events.NewEventsHandler()
+ p.events.AddListener(testListener)
+ p.consumers = make(map[string]Consumer)
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectMQBrokers,
+ }
+}
+
+func (p *Plugin) CollectMQBrokers(name endure.Named, c Consumer) {
+ p.consumers[name.Name()] = c
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Push(j *structs.Job) (string, error) {
+ pipe, pOpts, err := p.cfg.MatchPipeline(j)
+ if err != nil {
+ panic(err)
+ }
+
+ if pOpts != nil {
+ j.Options.Merge(pOpts)
+ }
+
+ broker, ok := p.consumers[pipe.Broker()]
+ if !ok {
+ panic("broker not found")
+ }
+
+ id, err := broker.Push(pipe, j)
+ if err != nil {
+ panic(err)
+ }
+
+ // p.events.Push()
+
+ return id, nil
+}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{log: p.log}
+}
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
new file mode 100644
index 00000000..e77cda59
--- /dev/null
+++ b/plugins/jobs/rpc.go
@@ -0,0 +1,20 @@
+package jobs
+
+import (
+ "github.com/spiral/roadrunner/v2/plugins/jobs/structs"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type rpc struct {
+ log logger.Logger
+ p *Plugin
+}
+
+func (r *rpc) Push(j *structs.Job, idRet *string) error {
+ id, err := r.p.Push(j)
+ if err != nil {
+ panic(err)
+ }
+ *idRet = id
+ return nil
+}
diff --git a/plugins/jobs/structs/job.go b/plugins/jobs/structs/job.go
new file mode 100644
index 00000000..2e394543
--- /dev/null
+++ b/plugins/jobs/structs/job.go
@@ -0,0 +1,35 @@
+package structs
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+// Job carries information about single job.
+type Job struct {
+ // Job contains name of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Body packs job payload into binary payload.
+func (j *Job) Body() []byte {
+ return utils.AsBytes(j.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+func (j *Job) Context(id string) []byte {
+ ctx, _ := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ }{ID: id, Job: j.Job},
+ )
+
+ return ctx
+}
diff --git a/plugins/jobs/structs/job_options.go b/plugins/jobs/structs/job_options.go
new file mode 100644
index 00000000..1507d053
--- /dev/null
+++ b/plugins/jobs/structs/job_options.go
@@ -0,0 +1,70 @@
+package structs
+
+import "time"
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int `json:"delay,omitempty"`
+
+ // Attempts define maximum job retries. Attention, value 1 will only allow job to execute once (without retry).
+ // Minimum valuable value is 2.
+ Attempts int `json:"maxAttempts,omitempty"`
+
+ // RetryDelay defines for how long job should be waiting until next retry. Defaults to none.
+ RetryDelay int `json:"retryDelay,omitempty"`
+
+ // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
+ Timeout int `json:"timeout,omitempty"`
+}
+
+// Merge merges job options.
+func (o *Options) Merge(from *Options) {
+ if o.Pipeline == "" {
+ o.Pipeline = from.Pipeline
+ }
+
+ if o.Attempts == 0 {
+ o.Attempts = from.Attempts
+ }
+
+ if o.Timeout == 0 {
+ o.Timeout = from.Timeout
+ }
+
+ if o.RetryDelay == 0 {
+ o.RetryDelay = from.RetryDelay
+ }
+
+ if o.Delay == 0 {
+ o.Delay = from.Delay
+ }
+}
+
+// CanRetry must return true if broker is allowed to re-run the job.
+func (o *Options) CanRetry(attempt int) bool {
+ // Attempts 1 and 0 has identical effect
+ return o.Attempts > (attempt + 1)
+}
+
+// RetryDuration returns retry delay duration in a form of time.Duration.
+func (o *Options) RetryDuration() time.Duration {
+ return time.Second * time.Duration(o.RetryDelay)
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+// TimeoutDuration returns timeout duration in a form of time.Duration.
+func (o *Options) TimeoutDuration() time.Duration {
+ if o.Timeout == 0 {
+ return 30 * time.Minute
+ }
+
+ return time.Second * time.Duration(o.Timeout)
+}
diff --git a/plugins/jobs/structs/job_options_test.go b/plugins/jobs/structs/job_options_test.go
new file mode 100644
index 00000000..18702394
--- /dev/null
+++ b/plugins/jobs/structs/job_options_test.go
@@ -0,0 +1,110 @@
+package structs
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestOptions_CanRetry(t *testing.T) {
+ opts := &Options{Attempts: 0}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_SameValue(t *testing.T) {
+ opts := &Options{Attempts: 1}
+
+ assert.False(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+}
+
+func TestOptions_CanRetry_Value(t *testing.T) {
+ opts := &Options{Attempts: 2}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.False(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_CanRetry_Value3(t *testing.T) {
+ opts := &Options{Attempts: 3}
+
+ assert.True(t, opts.CanRetry(0))
+ assert.True(t, opts.CanRetry(1))
+ assert.False(t, opts.CanRetry(2))
+}
+
+func TestOptions_RetryDuration(t *testing.T) {
+ opts := &Options{RetryDelay: 0}
+ assert.Equal(t, time.Duration(0), opts.RetryDuration())
+}
+
+func TestOptions_RetryDuration2(t *testing.T) {
+ opts := &Options{RetryDelay: 1}
+ assert.Equal(t, time.Second, opts.RetryDuration())
+}
+
+func TestOptions_DelayDuration(t *testing.T) {
+ opts := &Options{Delay: 0}
+ assert.Equal(t, time.Duration(0), opts.DelayDuration())
+}
+
+func TestOptions_DelayDuration2(t *testing.T) {
+ opts := &Options{Delay: 1}
+ assert.Equal(t, time.Second, opts.DelayDuration())
+}
+
+func TestOptions_TimeoutDuration(t *testing.T) {
+ opts := &Options{Timeout: 0}
+ assert.Equal(t, time.Minute*30, opts.TimeoutDuration())
+}
+
+func TestOptions_TimeoutDuration2(t *testing.T) {
+ opts := &Options{Timeout: 1}
+ assert.Equal(t, time.Second, opts.TimeoutDuration())
+}
+
+func TestOptions_Merge(t *testing.T) {
+ opts := &Options{}
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "pipeline", opts.Pipeline)
+ assert.Equal(t, 1, opts.Attempts)
+ assert.Equal(t, 2, opts.Delay)
+ assert.Equal(t, 1, opts.Timeout)
+ assert.Equal(t, 1, opts.RetryDelay)
+}
+
+func TestOptions_MergeKeepOriginal(t *testing.T) {
+ opts := &Options{
+ Pipeline: "default",
+ Delay: 10,
+ Timeout: 10,
+ Attempts: 10,
+ RetryDelay: 10,
+ }
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ Timeout: 1,
+ Attempts: 1,
+ RetryDelay: 1,
+ })
+
+ assert.Equal(t, "default", opts.Pipeline)
+ assert.Equal(t, 10, opts.Attempts)
+ assert.Equal(t, 10, opts.Delay)
+ assert.Equal(t, 10, opts.Timeout)
+ assert.Equal(t, 10, opts.RetryDelay)
+}
diff --git a/plugins/jobs/structs/job_test.go b/plugins/jobs/structs/job_test.go
new file mode 100644
index 00000000..e7240c6b
--- /dev/null
+++ b/plugins/jobs/structs/job_test.go
@@ -0,0 +1,19 @@
+package structs
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestJob_Body(t *testing.T) {
+ j := &Job{Payload: "hello"}
+
+ assert.Equal(t, []byte("hello"), j.Body())
+}
+
+func TestJob_Context(t *testing.T) {
+ j := &Job{Job: "job"}
+
+ assert.Equal(t, []byte(`{"id":"id","job":"job"}`), j.Context("id"))
+}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 4b675271..e5aac290 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -9,8 +9,8 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
index 6ae1a1f6..c839130f 100644
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -2,12 +2,15 @@ package boltdb
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-const PluginName = "boltdb"
+const (
+ PluginName string = "boltdb"
+ RootPluginName string = "kv"
+)
// Plugin BoltDB K/V storage.
type Plugin struct {
@@ -21,7 +24,7 @@ type Plugin struct {
}
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(kv.PluginName) {
+ if !cfg.Has(RootPluginName) {
return errors.E(errors.Disabled)
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index a2787d72..520ec7d5 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,8 +6,8 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
+ kv "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index 22ea5cca..59a2b7cb 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -2,12 +2,15 @@ package memcached
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-const PluginName = "memcached"
+const (
+ PluginName string = "memcached"
+ RootPluginName string = "kv"
+)
type Plugin struct {
// config plugin
@@ -17,7 +20,7 @@ type Plugin struct {
}
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(kv.PluginName) {
+ if !cfg.Has(RootPluginName) {
return errors.E(errors.Disabled)
}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index 03dbaed6..e9ea25df 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -5,10 +5,12 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
+// PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync.
const PluginName string = "kv"
const (
@@ -25,9 +27,9 @@ const (
type Plugin struct {
log logger.Logger
// constructors contains general storage constructors, such as boltdb, memory, memcached, redis.
- constructors map[string]Constructor
+ constructors map[string]kv.Constructor
// storages contains user-defined storages, such as boltdb-north, memcached-us and so on.
- storages map[string]Storage
+ storages map[string]kv.Storage
// KV configuration
cfg Config
cfgPlugin config.Configurer
@@ -43,8 +45,8 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if err != nil {
return errors.E(op, err)
}
- p.constructors = make(map[string]Constructor, 5)
- p.storages = make(map[string]Storage, 5)
+ p.constructors = make(map[string]kv.Constructor, 5)
+ p.storages = make(map[string]kv.Storage, 5)
p.log = log
p.cfgPlugin = cfg
return nil
@@ -203,7 +205,7 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) {
+func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor kv.Constructor) {
// save the storage constructor
p.constructors[name.Name()] = constructor
}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index af763600..b9b302fe 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,6 +2,7 @@ package kv
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
@@ -9,7 +10,7 @@ import (
// Wrapper for the plugin
type rpc struct {
// all available storages
- storages map[string]Storage
+ storages map[string]kv.Storage
// svc is a plugin implementing Storage interface
srv *Plugin
// Logger
diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go
index 1cf031d1..1906e4fd 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/kv.go
@@ -6,8 +6,8 @@ import (
"time"
"github.com/spiral/errors"
+ kv "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 70badf15..7d418a70 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,9 +2,9 @@ package memory
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -21,7 +21,6 @@ type Plugin struct {
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
- p.log = log
p.cfgPlugin = cfg
p.stop = make(chan struct{}, 1)
return nil
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index d027a8a5..3c909900 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -3,8 +3,8 @@ package memory
import (
"sync"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/bst"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go
index 5817853c..0cd62d19 100644
--- a/plugins/redis/channel.go
+++ b/plugins/redis/channel.go
@@ -6,7 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go
index 320b7443..378d7630 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv.go
@@ -7,8 +7,8 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 9d98790b..3c62a63f 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -5,9 +5,9 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index 4e41acb5..8bd78514 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -6,7 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 00639f43..e2fa0086 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -124,7 +124,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...event
const op = errors.Op("server_plugin_new_worker")
list := make([]events.Listener, 0, len(listeners))
- list = append(list, server.collectWorkerLogs)
+ list = append(list, server.collectWorkerEvents)
spawnCmd, err := server.CmdFactory(env)
if err != nil {
@@ -147,8 +147,8 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt pool.Config, env En
return nil, errors.E(op, err)
}
- list := make([]events.Listener, 0, 1)
- list = append(list, server.collectEvents)
+ list := make([]events.Listener, 0, 22)
+ list = append(list, server.collectPoolEvents, server.collectWorkerEvents)
if len(listeners) != 0 {
list = append(list, listeners...)
}
@@ -209,7 +209,7 @@ func (server *Plugin) setEnv(e Env) []string {
return env
}
-func (server *Plugin) collectEvents(event interface{}) {
+func (server *Plugin) collectPoolEvents(event interface{}) {
if we, ok := event.(events.PoolEvent); ok {
switch we.Event {
case events.EventMaxMemory:
@@ -238,7 +238,9 @@ func (server *Plugin) collectEvents(event interface{}) {
server.log.Warn("requested pool restart")
}
}
+}
+func (server *Plugin) collectWorkerEvents(event interface{}) {
if we, ok := event.(events.WorkerEvent); ok {
switch we.Event {
case events.EventWorkerError:
@@ -264,16 +266,13 @@ func (server *Plugin) collectEvents(event interface{}) {
}
}
-func (server *Plugin) collectWorkerLogs(event interface{}) {
- if we, ok := event.(events.WorkerEvent); ok {
- switch we.Event {
- case events.EventWorkerError:
- server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t"))
- case events.EventWorkerLog:
- server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
- // stderr event is INFO level
- case events.EventWorkerStderr:
- server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
+func (server *Plugin) collectJobsEvents(event interface{}) { //nolint:unused
+ if jev, ok := event.(events.JobEvent); ok {
+ switch jev.Event {
+ case events.EventJobStart:
+ server.log.Info("Job started", "start", jev.Start, "elapsed", jev.Elapsed)
+ case events.EventJobOK:
+ server.log.Info("Job OK", "start", jev.Start, "elapsed", jev.Elapsed)
}
}
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 664b4dfd..c1f79a78 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -7,7 +7,7 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index ca5f2f59..c9a31613 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -10,10 +10,10 @@ import (
"github.com/google/uuid"
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 752ba3ce..758620f6 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,7 +4,7 @@ import (
"sync"
json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/utils"
diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto
new file mode 100644
index 00000000..46434fa8
--- /dev/null
+++ b/proto/jobs/v1beta/jobs.proto
@@ -0,0 +1,22 @@
+syntax = "proto3";
+
+package kv.v1beta;
+option go_package = "./;jobsv1beta";
+
+message Request {
+ // could be an enum in the future
+ string storage = 1;
+ repeated Item items = 2;
+}
+
+message Item {
+ string key = 1;
+ bytes value = 2;
+ // RFC 3339
+ string timeout = 3;
+}
+
+// KV response for the KV RPC methods
+message Response {
+ repeated Item items = 1;
+}
diff --git a/proto/kv/v1beta/kv.pb.go b/proto/kv/v1beta/kv.pb.go
index 622967b8..75578bff 100644
--- a/proto/kv/v1beta/kv.pb.go
+++ b/proto/kv/v1beta/kv.pb.go
@@ -7,10 +7,11 @@
package kvv1beta
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
diff --git a/proto/websockets/v1beta/websockets.pb.go b/proto/websockets/v1beta/websockets.pb.go
index ad4ebbe7..a2868118 100644
--- a/proto/websockets/v1beta/websockets.pb.go
+++ b/proto/websockets/v1beta/websockets.pb.go
@@ -7,10 +7,11 @@
package websocketsv1beta
import (
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
+
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
diff --git a/tests/composer.json b/tests/composer.json
index 50178d1f..fa5925b7 100644
--- a/tests/composer.json
+++ b/tests/composer.json
@@ -2,7 +2,7 @@
"minimum-stability": "beta",
"prefer-stable": true,
"require": {
- "nyholm/psr7": "^1.3",
+ "nyholm/psr7": "^1.4",
"spiral/roadrunner": "^2.0",
"spiral/roadrunner-http": "^2.0",
"temporal/sdk": ">=1.0",
diff --git a/tests/docker-compose-jobs.yml b/tests/docker-compose-jobs.yml
new file mode 100644
index 00000000..7b88c9cf
--- /dev/null
+++ b/tests/docker-compose-jobs.yml
@@ -0,0 +1,22 @@
+version: "3"
+
+services:
+ beanstalk:
+ image: schickling/beanstalkd
+ ports:
+ - "11300:11300"
+
+ sqs:
+ image: vsouza/sqs-local
+ ports:
+ - "9324:9324"
+
+ rabbitmq:
+ image: rabbitmq:3-management
+ environment:
+ RABBITMQ_DEFAULT_USER: guest
+ RABBITMQ_DEFAULT_PASS: guest
+ RABBITMQ_DEFAULT_VHOST: /
+ ports:
+ - "15672:15672"
+ - "5672:5672" \ No newline at end of file
diff --git a/tests/plugins/broadcast/plugins/plugin1.go b/tests/plugins/broadcast/plugins/plugin1.go
index d3b16256..390ba581 100644
--- a/tests/plugins/broadcast/plugins/plugin1.go
+++ b/tests/plugins/broadcast/plugins/plugin1.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/tests/plugins/broadcast/plugins/plugin2.go b/tests/plugins/broadcast/plugins/plugin2.go
index 2bd819d2..809020dc 100644
--- a/tests/plugins/broadcast/plugins/plugin2.go
+++ b/tests/plugins/broadcast/plugins/plugin2.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/tests/plugins/broadcast/plugins/plugin3.go b/tests/plugins/broadcast/plugins/plugin3.go
index ef926222..4507a5b9 100644
--- a/tests/plugins/broadcast/plugins/plugin3.go
+++ b/tests/plugins/broadcast/plugins/plugin3.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/tests/plugins/broadcast/plugins/plugin4.go b/tests/plugins/broadcast/plugins/plugin4.go
index c9b94777..6783855e 100644
--- a/tests/plugins/broadcast/plugins/plugin4.go
+++ b/tests/plugins/broadcast/plugins/plugin4.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/tests/plugins/broadcast/plugins/plugin5.go b/tests/plugins/broadcast/plugins/plugin5.go
index 01562a8f..fade6b66 100644
--- a/tests/plugins/broadcast/plugins/plugin5.go
+++ b/tests/plugins/broadcast/plugins/plugin5.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/tests/plugins/broadcast/plugins/plugin6.go b/tests/plugins/broadcast/plugins/plugin6.go
index 76f2d6e8..d98a50b7 100644
--- a/tests/plugins/broadcast/plugins/plugin6.go
+++ b/tests/plugins/broadcast/plugins/plugin6.go
@@ -3,7 +3,7 @@ package plugins
import (
"fmt"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
"github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/tests/plugins/headers/configs/.rr-cors-headers.yaml b/tests/plugins/headers/configs/.rr-cors-headers.yaml
index 9d2ef7e5..b4e960f1 100644
--- a/tests/plugins/headers/configs/.rr-cors-headers.yaml
+++ b/tests/plugins/headers/configs/.rr-cors-headers.yaml
@@ -1,9 +1,5 @@
server:
command: "php ../../http/client.php headers pipes"
- user: ""
- group: ""
- env:
- "RR_HTTP": "true"
relay: "pipes"
relay_timeout: "20s"
diff --git a/tests/psr-worker-bench.php b/tests/psr-worker-bench.php
index d0c72eae..b4a028d4 100644
--- a/tests/psr-worker-bench.php
+++ b/tests/psr-worker-bench.php
@@ -56,4 +56,4 @@ if ($env->getMode() === 'http') {
}
$factory->run();
-} \ No newline at end of file
+}
diff --git a/tests/psr-worker.php b/tests/psr-worker.php
index db53eee2..de4befbc 100644
--- a/tests/psr-worker.php
+++ b/tests/psr-worker.php
@@ -20,7 +20,7 @@ while ($req = $psr7->waitRequest()) {
try {
$resp = new \Nyholm\Psr7\Response();
$resp->getBody()->write(str_repeat("hello world", 1000));
-
+
$psr7->respond($resp);
} catch (\Throwable $e) {
$psr7->getWorker()->error((string)$e);
diff --git a/tests/worker-cors.php b/tests/worker-cors.php
new file mode 100644
index 00000000..ea3c986c
--- /dev/null
+++ b/tests/worker-cors.php
@@ -0,0 +1,15 @@
+<?php
+
+use Spiral\RoadRunner\Worker;
+use Spiral\RoadRunner\Http\HttpWorker;
+
+ini_set('display_errors', 'stderr');
+require __DIR__ . '/vendor/autoload.php';
+
+$http = new HttpWorker(Worker::create());
+
+while ($req = $http->waitRequest()) {
+ $http->respond(200, 'Response', [
+ 'Access-Control-Allow-Origin' => ['*']
+ ]);
+}