summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-30 21:32:50 +0300
committerValery Piashchynski <[email protected]>2021-08-30 21:32:50 +0300
commitc7d9385f135853539100430521042f7e7e2ae005 (patch)
tree588f45f6cfcd716bb3197ebff8cfdbc86a984afc
parentf6070d04558ce2e06a114ec2d9a8557d6f88d89b (diff)
Tests for the boltdb jobs.
Fix issue with Stop in the jobs plugin which didn't destroy the pool. Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--CHANGELOG.md5
-rw-r--r--go.mod28
-rw-r--r--go.sum65
-rw-r--r--plugins/amqp/amqpjobs/consumer.go4
-rw-r--r--plugins/boltdb/boltjobs/consumer.go200
-rw-r--r--plugins/boltdb/boltjobs/item.go157
-rw-r--r--plugins/boltdb/boltjobs/listener.go144
-rw-r--r--plugins/boltdb/doc/boltjobs.drawio1
-rw-r--r--plugins/boltdb/doc/job_lifecycle.md10
-rw-r--r--plugins/ephemeral/consumer.go119
-rw-r--r--plugins/jobs/plugin.go12
-rw-r--r--plugins/sqs/consumer.go84
-rw-r--r--plugins/sqs/item.go8
-rw-r--r--plugins/sqs/listener.go36
-rw-r--r--tests/plugins/jobs/beanstalk/.rr-no-global.yaml3
-rw-r--r--tests/plugins/jobs/boltdb/.rr-no-global.yaml12
-rw-r--r--tests/plugins/jobs/helpers.go2
-rw-r--r--tests/plugins/jobs/jobs_beanstalk_test.go2
-rw-r--r--tests/plugins/jobs/jobs_boltdb_test.go524
-rw-r--r--tests/plugins/jobs/jobs_general_test.go5
-rw-r--r--tests/plugins/jobs/jobs_sqs_test.go14
21 files changed, 938 insertions, 497 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5e29bdb8..93bcf13b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,11 +11,12 @@ v2.4.0 (_.08.2021)
## 👀 New:
- ✏️ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime.
- Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `ephemeral`. [PR](https://github.com/spiral/roadrunner/pull/726)
+ Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `ephemeral` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726)
- Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port`, `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2)
## 🩹 Fixes:
-- 🐛 Fix: fixed bug with goroutines waiting on the internal worker's container channel.
+- 🐛 Fix: fixed bug with goroutines waiting on the internal worker's container channel, [issue](https://github.com/spiral/roadrunner/issues/750).
+- 🐛 Fix: RR become unresponsive when new workers failed to re-allocate, [issue](https://github.com/spiral/roadrunner/issues/772).
## 📈 Summary:
diff --git a/go.mod b/go.mod
index 38ab47cf..a8932d87 100644
--- a/go.mod
+++ b/go.mod
@@ -6,11 +6,11 @@ require (
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/alicebob/miniredis/v2 v2.15.1
// ========= AWS SDK v2
- github.com/aws/aws-sdk-go-v2 v1.8.1
- github.com/aws/aws-sdk-go-v2/config v1.6.1
- github.com/aws/aws-sdk-go-v2/credentials v1.3.3
- github.com/aws/aws-sdk-go-v2/service/sqs v1.7.2
- github.com/aws/smithy-go v1.7.0
+ github.com/aws/aws-sdk-go-v2 v1.9.0
+ github.com/aws/aws-sdk-go-v2/config v1.7.0
+ github.com/aws/aws-sdk-go-v2/credentials v1.4.0
+ github.com/aws/aws-sdk-go-v2/service/sqs v1.8.0
+ github.com/aws/smithy-go v1.8.0
// =====================
github.com/beanstalkd/go-beanstalk v0.1.0
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
@@ -18,11 +18,11 @@ require (
github.com/fasthttp/websocket v1.4.3
github.com/fatih/color v1.12.0
github.com/go-redis/redis/v8 v8.11.3
- github.com/gofiber/fiber/v2 v2.17.0
+ github.com/gofiber/fiber/v2 v2.18.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/json-iterator/go v1.1.11
- github.com/klauspost/compress v1.13.4
+ github.com/klauspost/compress v1.13.5
github.com/prometheus/client_golang v1.11.0
github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891
github.com/shirou/gopsutil v3.21.7+incompatible
@@ -38,7 +38,7 @@ require (
go.etcd.io/bbolt v1.3.6
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.19.0
- golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d
+ golang.org/x/net v0.0.0-20210825183410-e898025ed96a
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf
google.golang.org/protobuf v1.27.1
@@ -49,11 +49,11 @@ require (
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/andybalholm/brotli v1.0.3 // indirect
- github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.1 // indirect
- github.com/aws/aws-sdk-go-v2/internal/ini v1.2.1 // indirect
- github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.3 // indirect
- github.com/aws/aws-sdk-go-v2/service/sso v1.3.3 // indirect
- github.com/aws/aws-sdk-go-v2/service/sts v1.6.2 // indirect
+ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2 // indirect
+ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.0 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sso v1.4.0 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sts v1.7.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
@@ -80,7 +80,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
- github.com/tklauser/go-sysconf v0.3.8 // indirect
+ github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.29.0 // indirect
diff --git a/go.sum b/go.sum
index 4dc82718..776d4212 100644
--- a/go.sum
+++ b/go.sum
@@ -61,26 +61,26 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
-github.com/aws/aws-sdk-go-v2 v1.8.1 h1:GcFgQl7MsBygmeeqXyV1ivrTEmsVz/rdFJaTcltG9ag=
-github.com/aws/aws-sdk-go-v2 v1.8.1/go.mod h1:xEFuWz+3TYdlPRuo+CqATbeDWIWyaT5uAPwPaWtgse0=
-github.com/aws/aws-sdk-go-v2/config v1.6.1 h1:qrZINaORyr78syO1zfD4l7r4tZjy0Z1l0sy4jiysyOM=
-github.com/aws/aws-sdk-go-v2/config v1.6.1/go.mod h1:t/y3UPu0XEDy0cEw6mvygaBQaPzWiYAxfP2SzgtvclA=
-github.com/aws/aws-sdk-go-v2/credentials v1.3.3 h1:A13QPatmUl41SqUfnuT3V0E3XiNGL6qNTOINbE8cZL4=
-github.com/aws/aws-sdk-go-v2/credentials v1.3.3/go.mod h1:oVieKMT3m9BSfqhOfuQ+E0j/yN84ZAJ7Qv8Sfume/ak=
-github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.1 h1:rc+fRGvlKbeSd9IFhFS1KWBs0XjTkq0CfK5xqyLgIp0=
-github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.1/go.mod h1:+GTydg3uHmVlQdkRoetz6VHKbOMEYof70m19IpMLifc=
-github.com/aws/aws-sdk-go-v2/internal/ini v1.2.1 h1:IkqRRUZTKaS16P2vpX+FNc2jq3JWa3c478gykQp4ow4=
-github.com/aws/aws-sdk-go-v2/internal/ini v1.2.1/go.mod h1:Pv3WenDjI0v2Jl7UaMFIIbPOBbhn33RmmAmGgkXDoqY=
-github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.3 h1:VxFCgxsqWe7OThOwJ5IpFX3xrObtuIH9Hg/NW7oot1Y=
-github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.3/go.mod h1:7gcsONBmFoCcKrAqrm95trrMd2+C/ReYKP7Vfu8yHHA=
-github.com/aws/aws-sdk-go-v2/service/sqs v1.7.2 h1:RFCFJzkGKSpVQZiTyMgoW3V7/uFIUFip5t6ljvD+Uo0=
-github.com/aws/aws-sdk-go-v2/service/sqs v1.7.2/go.mod h1:TGLWOGp2jII8DZhzRUQXcrsYMvk7fqz8zYdNPq4YQ8Y=
-github.com/aws/aws-sdk-go-v2/service/sso v1.3.3 h1:K2gCnGvAASpz+jqP9iyr+F/KNjmTYf8aWOtTQzhmZ5w=
-github.com/aws/aws-sdk-go-v2/service/sso v1.3.3/go.mod h1:Jgw5O+SK7MZ2Yi9Yvzb4PggAPYaFSliiQuWR0hNjexk=
-github.com/aws/aws-sdk-go-v2/service/sts v1.6.2 h1:l504GWCoQi1Pk68vSUFGLmDIEMzRfVGNgLakDK+Uj58=
-github.com/aws/aws-sdk-go-v2/service/sts v1.6.2/go.mod h1:RBhoMJB8yFToaCnbe0jNq5Dcdy0jp6LhHqg55rjClkM=
-github.com/aws/smithy-go v1.7.0 h1:+cLHMRrDZvQ4wk+KuQ9yH6eEg6KZEJ9RI2IkDqnygCg=
-github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
+github.com/aws/aws-sdk-go-v2 v1.9.0 h1:+S+dSqQCN3MSU5vJRu1HqHrq00cJn6heIMU7X9hcsoo=
+github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
+github.com/aws/aws-sdk-go-v2/config v1.7.0 h1:J2cZ7qe+3IpqBEXnHUrFrOjoB9BlsXg7j53vxcl5IVg=
+github.com/aws/aws-sdk-go-v2/config v1.7.0/go.mod h1:w9+nMZ7soXCe5nT46Ri354SNhXDQ6v+V5wqDjnZE+GY=
+github.com/aws/aws-sdk-go-v2/credentials v1.4.0 h1:kmvesfjY861FzlCU9mvAfe01D9aeXcG2ZuC+k9F2YLM=
+github.com/aws/aws-sdk-go-v2/credentials v1.4.0/go.mod h1:dgGR+Qq7Wjcd4AOAW5Rf5Tnv3+x7ed6kETXyS9WCuAY=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0 h1:OxTAgH8Y4BXHD6PGCJ8DHx2kaZPCQfSTqmDsdRZFezE=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0/go.mod h1:CpNzHK9VEFUCknu50kkB8z58AH2B5DvPP7ea1LHve/Y=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2 h1:d95cddM3yTm4qffj3P6EnP+TzX1SSkWaQypXSgT/hpA=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2/go.mod h1:BQV0agm+JEhqR+2RT5e1XTFIDcAAV0eW6z2trp+iduw=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.0 h1:VNJ5NLBteVXEwE2F1zEXVmyIH58mZ6kIQGJoC7C+vkg=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.0/go.mod h1:R1KK+vY8AfalhG1AOu5e35pOD2SdoPKQCFLTvnxiohk=
+github.com/aws/aws-sdk-go-v2/service/sqs v1.8.0 h1:BI05Jbkaqp5IDxiobr3B59mX07lfpLJDv5NwAEx3wSs=
+github.com/aws/aws-sdk-go-v2/service/sqs v1.8.0/go.mod h1:BXA1CVaEd9TBOQ8G2ke7lMWdVggAeh35+h2HDO50z7s=
+github.com/aws/aws-sdk-go-v2/service/sso v1.4.0 h1:sHXMIKYS6YiLPzmKSvDpPmOpJDHxmAUgbiF49YNVztg=
+github.com/aws/aws-sdk-go-v2/service/sso v1.4.0/go.mod h1:+1fpWnL96DL23aXPpMGbsmKe8jLTEfbjuQoA4WS1VaA=
+github.com/aws/aws-sdk-go-v2/service/sts v1.7.0 h1:1at4e5P+lvHNl2nUktdM2/v+rpICg/QSEr9TO/uW9vU=
+github.com/aws/aws-sdk-go-v2/service/sts v1.7.0/go.mod h1:0qcSMCyASQPN2sk/1KQLQ2Fh6yq8wm0HSDAimPhzCoM=
+github.com/aws/smithy-go v1.8.0 h1:AEwwwXQZtUwP5Mz506FeXXrKBe0jA8gVM+1gEcSRooc=
+github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/beanstalkd/go-beanstalk v0.1.0 h1:IiNwYbAoVBDs5xEOmleGoX+DRD3Moz99EpATbl8672w=
github.com/beanstalkd/go-beanstalk v0.1.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
@@ -96,7 +96,6 @@ github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQ
github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
-github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@@ -128,8 +127,6 @@ github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
-github.com/fsnotify/fsnotify v1.5.0 h1:NO5hkcB+srp1x6QmwvNZLeaOgbM8cmBTN32THzjvu2k=
-github.com/fsnotify/fsnotify v1.5.0/go.mod h1:BX0DCEr5pT4jm2CnQdVP1lFV521fcCNcyEeNp4DQQDk=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -150,8 +147,8 @@ github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
-github.com/gofiber/fiber/v2 v2.17.0 h1:qP3PkGUbBB0i9iQh5E057XI1yO5CZigUxZhyUFYAFoM=
-github.com/gofiber/fiber/v2 v2.17.0/go.mod h1:iftruuHGkRYGEXVISmdD7HTYWyfS2Bh+Dkfq4n/1Owg=
+github.com/gofiber/fiber/v2 v2.18.0 h1:xCWYSVoTNibHpzfciPwUSZGiTyTpTXYchCwynuJU09s=
+github.com/gofiber/fiber/v2 v2.18.0/go.mod h1:/LdZHMUXZvTTo7gU4+b1hclqCAdoQphNQ9bi9gutPyI=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
@@ -268,9 +265,9 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
-github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
-github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
+github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4=
+github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
@@ -352,8 +349,6 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
-github.com/rabbitmq/amqp091-go v0.0.0-20210812094702-b2a427eb7d17 h1:3HQ5TTZU56EjMWPU0K0Nqz1aakOLQUwV4lZ6tNxtXvc=
-github.com/rabbitmq/amqp091-go v0.0.0-20210812094702-b2a427eb7d17/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891 h1:13nv5f/LNJxNpvpYm/u0NqrlFebon342f9Xu9GpklKc=
github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@@ -401,15 +396,13 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
-github.com/tklauser/go-sysconf v0.3.8 h1:41Nq9J+pxKud4IQ830J5LlS5nl67dVQC7AuisUooaOU=
-github.com/tklauser/go-sysconf v0.3.8/go.mod h1:z4zYWRS+X53WUKtBcmDg1comV3fPhdQnzasnIHUoLDU=
-github.com/tklauser/numcpus v0.2.3/go.mod h1:vpEPS/JC+oZGGQ/My/vJnNsvMDQL6PwOqt8dsCw5j+E=
+github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
+github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ=
github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.14.0/go.mod h1:ol1PCaL0dX20wC0htZ7sYCsvCYmrouYra0zHzaclZhE=
-github.com/valyala/fasthttp v1.26.0/go.mod h1:cmWIqlu99AO/RKcp1HWaViTqc57FswJOfYYdPJBl8BA=
github.com/valyala/fasthttp v1.29.0 h1:F5GKpytwFk5OhCuRh6H+d4vZAcEeNAwPTdwQnm6IERY=
github.com/valyala/fasthttp v1.29.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
@@ -541,8 +534,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRXg1aWdlNOVOHRq45d7c=
-golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20210825183410-e898025ed96a h1:bRuuGXV8wwSdGTB+CtJf+FjgO1APK1CoO39T4BN/XBw=
+golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -629,8 +622,6 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20210820121016-41cdb8703e55 h1:rw6UNGRMfarCepjI8qOepea/SXwIBVfTKjztZ5gBbq4=
-golang.org/x/sys v0.0.0-20210820121016-41cdb8703e55/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index f1b4d54f..578f36ce 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -420,7 +420,9 @@ func (c *consumer) Resume(_ context.Context, p string) {
}
func (c *consumer) Stop(context.Context) error {
- c.stopCh <- struct{}{}
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ }
pipe := c.pipeline.Load().(*pipeline.Pipeline)
c.eh.Push(events.JobEvent{
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go
index 67a6d3e7..2492ab60 100644
--- a/plugins/boltdb/boltjobs/consumer.go
+++ b/plugins/boltdb/boltjobs/consumer.go
@@ -5,10 +5,10 @@ import (
"context"
"encoding/gob"
"os"
+ "sync"
"sync/atomic"
"time"
- "github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -22,11 +22,12 @@ import (
)
const (
- PluginName = "boltdb"
+ PluginName string = "boltdb"
+ rrDB string = "rr.db"
- PushBucket = "push"
- InQueueBucket = "processing"
- DoneBucket = "done"
+ PushBucket string = "push"
+ InQueueBucket string = "processing"
+ DelayBucket string = "delayed"
)
type consumer struct {
@@ -37,11 +38,16 @@ type consumer struct {
db *bolt.DB
- log logger.Logger
- eh events.Handler
- pq priorityqueue.Queue
+ bPool sync.Pool
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+ pipeline atomic.Value
+ cond *sync.Cond
+
listeners uint32
- pipeline atomic.Value
+ active *uint64
+ delayed *uint64
stopCh chan struct{}
}
@@ -90,20 +96,36 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
// tx.Commit invokes via the db.Update
err = db.Update(func(tx *bolt.Tx) error {
const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
if err != nil {
return errors.E(op, upOp)
}
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
return nil
})
+
if err != nil {
return nil, errors.E(op, err)
}
@@ -114,11 +136,19 @@ func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e
priority: localCfg.Priority,
prefetch: localCfg.Prefetch,
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
db: db,
log: log,
eh: e,
pq: pq,
- stopCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}, 2),
}, nil
}
@@ -139,7 +169,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// add default values
conf.InitDefaults()
- db, err := bolt.Open(pipeline.String(file, "rr.db"), os.FileMode(conf.Permissions), &bolt.Options{
+ db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{
Timeout: time.Second * 20,
NoGrowSync: false,
NoFreelistSync: false,
@@ -155,18 +185,34 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// tx.Commit invokes via the db.Update
err = db.Update(func(tx *bolt.Tx) error {
const upOp = errors.Op("boltdb_plugin_update")
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket))
if err != nil {
return errors.E(op, upOp)
}
- _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DoneBucket))
+
+ _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket))
if err != nil {
return errors.E(op, upOp)
}
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ cursor := inQb.Cursor()
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ // get all items, which are in the InQueueBucket and put them into the PushBucket
+ for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
+ err = pushB.Put(k, v)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
return nil
})
@@ -175,31 +221,74 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
}
return &consumer{
- file: pipeline.String(file, "rr.db"),
+ file: pipeline.String(file, rrDB),
priority: pipeline.Int(priority, 10),
prefetch: pipeline.Int(prefetch, 100),
permissions: conf.Permissions,
+ bPool: sync.Pool{New: func() interface{} {
+ return new(bytes.Buffer)
+ }},
+ cond: sync.NewCond(&sync.Mutex{}),
+
+ delayed: utils.Uint64(0),
+ active: utils.Uint64(0),
+
db: db,
log: log,
eh: e,
pq: pq,
- stopCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}, 2),
}, nil
}
-func (c *consumer) Push(ctx context.Context, job *job.Job) error {
+func (c *consumer) Push(_ context.Context, job *job.Job) error {
const op = errors.Op("boltdb_jobs_push")
err := c.db.Update(func(tx *bolt.Tx) error {
+ item := fromJob(job)
+
+ // handle delay
+ if item.Options.Delay > 0 {
+ b := tx.Bucket(utils.AsBytes(DelayBucket))
+ tKey := time.Now().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339)
+
+ // pool with buffers
+ buf := c.get()
+ defer c.put(buf)
+
+ enc := gob.NewEncoder(buf)
+ err := enc.Encode(item)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+
+ atomic.AddUint64(c.delayed, 1)
+
+ return b.Put(utils.AsBytes(tKey), value)
+ }
+
b := tx.Bucket(utils.AsBytes(PushBucket))
- buf := new(bytes.Buffer)
+
+ // pool with buffers
+ buf := c.get()
+ defer c.put(buf)
+
enc := gob.NewEncoder(buf)
- err := enc.Encode(job)
+ err := enc.Encode(item)
if err != nil {
- return err
+ return errors.E(op, err)
}
- return b.Put(utils.AsBytes(uuid.NewString()), buf.Bytes())
+ value := make([]byte, buf.Len())
+ copy(value, buf.Bytes())
+
+ // increment active counter
+ atomic.AddUint64(c.active, 1)
+
+ return b.Put(utils.AsBytes(item.ID()), value)
})
if err != nil {
@@ -221,14 +310,41 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
+
+ // run listener
+ go c.listener()
+ go c.delayedJobsListener()
+
+ // increase number of listeners
+ atomic.AddUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}
-func (c *consumer) Stop(ctx context.Context) error {
+func (c *consumer) Stop(_ context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
+ }
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
return nil
}
-func (c *consumer) Pause(ctx context.Context, p string) {
+func (c *consumer) Pause(_ context.Context, p string) {
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested pause on: ", p)
@@ -242,6 +358,7 @@ func (c *consumer) Pause(ctx context.Context, p string) {
}
c.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
atomic.AddUint32(&c.listeners, ^uint32(0))
@@ -253,7 +370,7 @@ func (c *consumer) Pause(ctx context.Context, p string) {
})
}
-func (c *consumer) Resume(ctx context.Context, p string) {
+func (c *consumer) Resume(_ context.Context, p string) {
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
c.log.Error("no such pipeline", "requested resume on: ", p)
@@ -268,6 +385,7 @@ func (c *consumer) Resume(ctx context.Context, p string) {
// run listener
go c.listener()
+ go c.delayedJobsListener()
// increase number of listeners
atomic.AddUint32(&c.listeners, 1)
@@ -280,6 +398,30 @@ func (c *consumer) Resume(ctx context.Context, p string) {
})
}
-func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
- return nil, nil
+func (c *consumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: PushBucket,
+ Active: int64(atomic.LoadUint64(c.active)),
+ Delayed: int64(atomic.LoadUint64(c.delayed)),
+ Ready: toBool(atomic.LoadUint32(&c.listeners)),
+ }, nil
+}
+
+// Private
+
+func (c *consumer) get() *bytes.Buffer {
+ return c.bPool.Get().(*bytes.Buffer)
+}
+
+func (c *consumer) put(b *bytes.Buffer) {
+ b.Reset()
+ c.bPool.Put(b)
+}
+
+func toBool(r uint32) bool {
+ return r > 0
}
diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go
index 8a4aefa3..4f02bb43 100644
--- a/plugins/boltdb/boltjobs/item.go
+++ b/plugins/boltdb/boltjobs/item.go
@@ -1,8 +1,16 @@
package boltjobs
import (
+ "bytes"
+ "encoding/gob"
+ "sync/atomic"
+ "time"
+
json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
+ "go.etcd.io/bbolt"
)
type Item struct {
@@ -33,6 +41,12 @@ type Options struct {
// Delay defines time duration to delay execution for. Defaults to none.
Delay int64 `json:"delay,omitempty"`
+
+ // private
+ db *bbolt.DB
+
+ active *uint64
+ delayed *uint64
}
func (i *Item) ID() string {
@@ -65,13 +79,150 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
- panic("implement me")
+ const op = errors.Op("boltdb_item_ack")
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ if i.Options.Delay > 0 {
+ atomic.AddUint64(i.Options.delayed, ^uint64(0))
+ } else {
+ atomic.AddUint64(i.Options.active, ^uint64(0))
+ }
+
+ return tx.Commit()
}
func (i *Item) Nack() error {
- panic("implement me")
+ const op = errors.Op("boltdb_item_ack")
+ /*
+ steps:
+ 1. begin tx
+ 2. get item by ID from the InQueueBucket (previously put in the listener)
+ 3. put it back to the PushBucket
+ 4. Delete it from the InQueueBucket
+ */
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ v := inQb.Get(utils.AsBytes(i.ID()))
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ err = pushB.Put(utils.AsBytes(i.ID()), v)
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ _ = tx.Rollback()
+ return errors.E(op, err)
+ }
+
+ return tx.Commit()
}
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
- panic("implement me")
+ const op = errors.Op("boltdb_item_requeue")
+ i.Headers = headers
+ i.Options.Delay = delay
+
+ tx, err := i.Options.db.Begin(true)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ if delay > 0 {
+ delayB := tx.Bucket(utils.AsBytes(DelayBucket))
+ tKey := time.Now().Add(time.Second * time.Duration(delay)).Format(time.RFC3339)
+
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err = enc.Encode(i)
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = delayB.Put(utils.AsBytes(tKey), buf.Bytes())
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ return tx.Commit()
+ }
+
+ pushB := tx.Bucket(utils.AsBytes(PushBucket))
+
+ buf := new(bytes.Buffer)
+ enc := gob.NewEncoder(buf)
+ err = enc.Encode(i)
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = pushB.Put(utils.AsBytes(i.ID()), buf.Bytes())
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ err = inQb.Delete(utils.AsBytes(i.ID()))
+ if err != nil {
+ return errors.E(op, i.rollback(err, tx))
+ }
+
+ return tx.Commit()
+}
+
+func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) {
+ i.Options.db = db
+ i.Options.active = active
+ i.Options.delayed = delayed
+}
+
+func (i *Item) rollback(err error, tx *bbolt.Tx) error {
+ errR := tx.Rollback()
+ if errR != nil {
+ return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR)
+ }
+ return errors.Errorf("transaction commit error: %v", err)
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Headers: job.Headers,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
}
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 2ee06088..d184303a 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -1,34 +1,160 @@
package boltjobs
import (
- "fmt"
+ "bytes"
+ "encoding/gob"
+ "sync/atomic"
"time"
"github.com/spiral/roadrunner/v2/utils"
+ bolt "go.etcd.io/bbolt"
)
func (c *consumer) listener() {
- tt := time.NewTicker(time.Second)
+ tt := time.NewTicker(time.Millisecond * 10)
+ defer tt.Stop()
for {
select {
case <-c.stopCh:
c.log.Warn("boltdb listener stopped")
return
case <-tt.C:
- tx, err := c.db.Begin(false)
+ if atomic.LoadUint64(c.active) >= uint64(c.prefetch) {
+ time.Sleep(time.Second)
+ continue
+ }
+
+ tx, err := c.db.Begin(true)
if err != nil {
- panic(err)
+ c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
+ continue
}
b := tx.Bucket(utils.AsBytes(PushBucket))
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+
+ // get first item
+ k, v := b.Cursor().First()
+ if k == nil && v == nil {
+ _ = tx.Commit()
+ continue
+ }
+
+ buf := bytes.NewReader(v)
+ dec := gob.NewDecoder(buf)
+
+ item := &Item{}
+ err = dec.Decode(item)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
- cursor := b.Cursor()
+ err = inQb.Put(utils.AsBytes(item.ID()), v)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
- k, v := cursor.First()
- _ = k
- _ = v
+ // delete key from the PushBucket
+ err = b.Delete(k)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
- fmt.Println("foo")
+ err = tx.Commit()
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // attach pointer to the DB
+ item.attachDB(c.db, c.active, c.delayed)
+ // as the last step, after commit, put the item into the PQ
+ c.pq.Insert(item)
}
}
}
+
+func (c *consumer) delayedJobsListener() {
+ tt := time.NewTicker(time.Millisecond * 100)
+ defer tt.Stop()
+ for {
+ select {
+ case <-c.stopCh:
+ c.log.Warn("boltdb listener stopped")
+ return
+ case <-tt.C:
+ tx, err := c.db.Begin(true)
+ if err != nil {
+ c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err)
+ continue
+ }
+
+ delayB := tx.Bucket(utils.AsBytes(DelayBucket))
+ inQb := tx.Bucket(utils.AsBytes(InQueueBucket))
+
+ // get first item
+ k, v := delayB.Cursor().First()
+ if k == nil && v == nil {
+ _ = tx.Commit()
+ continue
+ }
+
+ t, err := time.Parse(time.RFC3339, utils.AsString(k))
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ if t.After(time.Now()) {
+ _ = tx.Commit()
+ continue
+ }
+
+ buf := bytes.NewReader(v)
+ dec := gob.NewDecoder(buf)
+
+ item := &Item{}
+ err = dec.Decode(item)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = inQb.Put(utils.AsBytes(item.ID()), v)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // delete key from the PushBucket
+ err = delayB.Delete(k)
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ c.rollback(err, tx)
+ continue
+ }
+
+ // attach pointer to the DB
+ item.attachDB(c.db, c.active, c.delayed)
+ // as the last step, after commit, put the item into the PQ
+ c.pq.Insert(item)
+ }
+ }
+}
+
+func (c *consumer) rollback(err error, tx *bolt.Tx) {
+ errR := tx.Rollback()
+ if errR != nil {
+ c.log.Error("transaction commit error, rollback failed", "error", err, "rollback error", errR)
+ }
+
+ c.log.Error("transaction commit error, rollback succeed", "error", err)
+}
diff --git a/plugins/boltdb/doc/boltjobs.drawio b/plugins/boltdb/doc/boltjobs.drawio
new file mode 100644
index 00000000..feeccae0
--- /dev/null
+++ b/plugins/boltdb/doc/boltjobs.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-08-30T08:11:04.405Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.164 Electron/13.2.3 Safari/537.36" etag="mdoOmMZM5sMC4nWvZBy0" version="14.6.13" type="device"><diagram id="NuJwivb--D1hymDgb9NQ" name="Page-1">ddHBDsIgDADQr+GOEPcDc+rF0w6eyahAwtaFsWz69W4ZiGR6orwWSgrhZTtfnOj1DSVYwqicCT8Rxg7HoliWVZ5BKAuinJHBEtTmBbEw6GgkDFmhR7Te9Dk22HXQ+MyEczjlZQ+0eddeKNhB3Qi717uRXgctKE2JKxilY2sWM62I1QEGLSROX8QrwkuH6LeonUuw6/jiYLZz5z/Zz8scdP7HgSVIdy+b7I949QY=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md
new file mode 100644
index 00000000..317aec90
--- /dev/null
+++ b/plugins/boltdb/doc/job_lifecycle.md
@@ -0,0 +1,10 @@
+### Job lifecycle
+
+There are several boltdb buckets:
+
+1. `PushBucket` - used for pushed jobs via RPC.
+2. `InQueueBucket` - when the job consumed from the `PushBucket`, in the same transaction, it copied into the priority queue and
+get into the `InQueueBucket` waiting to acknowledgement.
+3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration.
+
+``
diff --git a/plugins/ephemeral/consumer.go b/plugins/ephemeral/consumer.go
index 91b8eda9..8870bb0f 100644
--- a/plugins/ephemeral/consumer.go
+++ b/plugins/ephemeral/consumer.go
@@ -88,16 +88,16 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
return jb, nil
}
-func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
+func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- _, ok := j.pipeline.Load().(*pipeline.Pipeline)
+ _, ok := c.pipeline.Load().(*pipeline.Pipeline)
if !ok {
return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
}
- err := j.handleItem(ctx, fromJob(jb))
+ err := c.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
}
@@ -105,42 +105,42 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *consumer) State(_ context.Context) (*jobState.State, error) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) State(_ context.Context) (*jobState.State, error) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
return &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Queue: pipe.Name(),
- Active: atomic.LoadInt64(j.active),
- Delayed: atomic.LoadInt64(j.delayed),
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Active: atomic.LoadInt64(c.active),
+ Delayed: atomic.LoadInt64(c.delayed),
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}, nil
}
-func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
- j.pipeline.Store(pipeline)
+func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+ c.pipeline.Store(pipeline)
return nil
}
-func (j *consumer) Pause(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Pause(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested pause on: ", p)
+ c.log.Error("no such pipeline", "requested pause on: ", p)
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// stop the consumer
- j.stopCh <- struct{}{}
+ c.stopCh <- struct{}{}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -149,24 +149,24 @@ func (j *consumer) Pause(_ context.Context, p string) {
})
}
-func (j *consumer) Resume(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Resume(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested resume on: ", p)
+ c.log.Error("no such pipeline", "requested resume on: ", p)
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// listener already active
if l == 1 {
- j.log.Warn("listener already in the active state")
+ c.log.Warn("listener already in the active state")
return
}
// resume the consumer on the same channel
- j.consume()
+ c.consume()
- atomic.StoreUint32(&j.listeners, 1)
- j.eh.Push(events.JobEvent{
+ atomic.StoreUint32(&c.listeners, 1)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Pipeline: pipe.Name(),
Start: time.Now(),
@@ -175,8 +175,8 @@ func (j *consumer) Resume(_ context.Context, p string) {
}
// Run is no-op for the ephemeral
-func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
- j.eh.Push(events.JobEvent{
+func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -185,84 +185,79 @@ func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
return nil
}
-func (j *consumer) Stop(ctx context.Context) error {
- const op = errors.Op("ephemeral_plugin_stop")
+func (c *consumer) Stop(_ context.Context) error {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- select {
- // return from the consumer
- case j.stopCh <- struct{}{}:
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Pipeline: pipe.Name(),
- Start: time.Now(),
- Elapsed: 0,
- })
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ }
- return nil
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ Elapsed: 0,
+ })
- case <-ctx.Done():
- return errors.E(op, ctx.Err())
- }
+ return nil
}
-func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("ephemeral_handle_request")
// handle timeouts
// theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
// goroutines here. We should limit goroutines here.
if msg.Options.Delay > 0 {
// if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
+ if atomic.LoadUint64(&c.goroutines) >= goroutinesMax {
return errors.E(op, errors.Str("max concurrency number reached"))
}
go func(jj *Item) {
- atomic.AddUint64(&j.goroutines, 1)
- atomic.AddInt64(j.delayed, 1)
+ atomic.AddUint64(&c.goroutines, 1)
+ atomic.AddInt64(c.delayed, 1)
time.Sleep(jj.Options.DelayDuration())
// send the item after timeout expired
- j.localPrefetch <- jj
+ c.localPrefetch <- jj
- atomic.AddUint64(&j.goroutines, ^uint64(0))
+ atomic.AddUint64(&c.goroutines, ^uint64(0))
}(msg)
return nil
}
// increase number of the active jobs
- atomic.AddInt64(j.active, 1)
+ atomic.AddInt64(c.active, 1)
// insert to the local, limited pipeline
select {
- case j.localPrefetch <- msg:
+ case c.localPrefetch <- msg:
return nil
case <-ctx.Done():
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err()))
}
}
-func (j *consumer) consume() {
+func (c *consumer) consume() {
go func() {
// redirect
for {
select {
- case item, ok := <-j.localPrefetch:
+ case item, ok := <-c.localPrefetch:
if !ok {
- j.log.Warn("ephemeral local prefetch queue was closed")
+ c.log.Warn("ephemeral local prefetch queue was closed")
return
}
// set requeue channel
- item.Options.requeueFn = j.handleItem
- item.Options.active = j.active
- item.Options.delayed = j.delayed
+ item.Options.requeueFn = c.handleItem
+ item.Options.active = c.active
+ item.Options.delayed = c.delayed
- j.pq.Insert(item)
- case <-j.stopCh:
+ c.pq.Insert(item)
+ case <-c.stopCh:
return
}
}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index a0b477f9..236aded3 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -177,6 +177,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return true
})
+ // do not continue processing, immediately stop if channel contains an error
+ if len(errCh) > 0 {
+ return errCh
+ }
+
var err error
p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs})
if err != nil {
@@ -279,6 +284,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
Start: start,
Elapsed: time.Since(start),
})
+
+ continue
}
// handle the response protocol
@@ -330,6 +337,10 @@ func (p *Plugin) Stop() error {
cancel()
}
+ p.Lock()
+ p.workersPool.Destroy(context.Background())
+ p.Unlock()
+
// this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
// but if not, this is not a problem at all.
// The main target is to stop the drivers
@@ -342,7 +353,6 @@ func (p *Plugin) Stop() error {
// just wait pollers for 5 seconds before exit
time.Sleep(time.Second * 5)
-
return nil
}
diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go
index 23203190..dfbda154 100644
--- a/plugins/sqs/consumer.go
+++ b/plugins/sqs/consumer.go
@@ -227,12 +227,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
return jb, nil
}
-func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
+func (c *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != jb.Options.Pipeline {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
}
@@ -243,17 +243,17 @@ func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
}
- err := j.handleItem(ctx, fromJob(jb))
+ err := c.handleItem(ctx, fromJob(jb))
if err != nil {
return errors.E(op, err)
}
return nil
}
-func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
+func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("sqs_state")
- attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
- QueueUrl: j.queueURL,
+ attr, err := c.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: c.queueURL,
AttributeNames: []types.QueueAttributeName{
types.QueueAttributeNameApproximateNumberOfMessages,
types.QueueAttributeNameApproximateNumberOfMessagesDelayed,
@@ -265,13 +265,13 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
return nil, errors.E(op, err)
}
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
out := &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
- Queue: *j.queueURL,
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Queue: *c.queueURL,
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}
nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)])
@@ -292,28 +292,28 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
return out, nil
}
-func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- j.pipeline.Store(p)
+func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ c.pipeline.Store(p)
return nil
}
-func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("sqs_run")
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
// start listener
- go j.listen(context.Background())
+ go c.listen(context.Background())
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -323,11 +323,13 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *consumer) Stop(context.Context) error {
- j.pauseCh <- struct{}{}
+func (c *consumer) Stop(context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.pauseCh <- struct{}{}
+ }
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -336,27 +338,27 @@ func (j *consumer) Stop(context.Context) error {
return nil
}
-func (j *consumer) Pause(_ context.Context, p string) {
+func (c *consumer) Pause(_ context.Context, p string) {
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
return
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// stop consume
- j.pauseCh <- struct{}{}
+ c.pauseCh <- struct{}{}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -364,28 +366,28 @@ func (j *consumer) Pause(_ context.Context, p string) {
})
}
-func (j *consumer) Resume(_ context.Context, p string) {
+func (c *consumer) Resume(_ context.Context, p string) {
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
return
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 1 {
- j.log.Warn("sqs listener already in the active state")
+ c.log.Warn("sqs listener already in the active state")
return
}
// start listener
- go j.listen(context.Background())
+ go c.listen(context.Background())
// increase num of listeners
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -393,12 +395,12 @@ func (j *consumer) Resume(_ context.Context, p string) {
})
}
-func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
- d, err := msg.pack(j.queueURL)
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
+ d, err := msg.pack(c.queueURL)
if err != nil {
return err
}
- _, err = j.client.SendMessage(ctx, d)
+ _, err = c.client.SendMessage(ctx, d)
if err != nil {
return err
}
diff --git a/plugins/sqs/item.go b/plugins/sqs/item.go
index 996adf6c..4e33e99e 100644
--- a/plugins/sqs/item.go
+++ b/plugins/sqs/item.go
@@ -192,7 +192,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
}, nil
}
-func (j *consumer) unpack(msg *types.Message) (*Item, error) {
+func (c *consumer) unpack(msg *types.Message) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
@@ -236,10 +236,10 @@ func (j *consumer) unpack(msg *types.Message) (*Item, error) {
// private
approxReceiveCount: int64(recCount),
- client: j.client,
- queue: j.queueURL,
+ client: c.client,
+ queue: c.queueURL,
receiptHandler: msg.ReceiptHandle,
- requeueFn: j.handleItem,
+ requeueFn: c.handleItem,
},
}
diff --git a/plugins/sqs/listener.go b/plugins/sqs/listener.go
index a4280af2..215dd6a5 100644
--- a/plugins/sqs/listener.go
+++ b/plugins/sqs/listener.go
@@ -18,22 +18,22 @@ const (
NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
)
-func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
+func (c *consumer) listen(ctx context.Context) { //nolint:gocognit
for {
select {
- case <-j.pauseCh:
- j.log.Warn("sqs listener stopped")
+ case <-c.pauseCh:
+ c.log.Warn("sqs listener stopped")
return
default:
- message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
- QueueUrl: j.queueURL,
- MaxNumberOfMessages: j.prefetch,
+ message, err := c.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
+ QueueUrl: c.queueURL,
+ MaxNumberOfMessages: c.prefetch,
AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
MessageAttributeNames: []string{All},
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
- VisibilityTimeout: j.visibilityTimeout,
- WaitTimeSeconds: j.waitTime,
+ VisibilityTimeout: c.visibilityTimeout,
+ WaitTimeSeconds: c.waitTime,
})
if err != nil {
@@ -42,10 +42,10 @@ func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok {
// in case of NonExistentQueue - recreate the queue
if apiErr.Code == NonExistentQueue {
- j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
- _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags})
+ c.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
+ _, err = c.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: c.queue, Attributes: c.attributes, Tags: c.tags})
if err != nil {
- j.log.Error("create queue", "error", err)
+ c.log.Error("create queue", "error", err)
}
// To successfully create a new queue, you must provide a
// queue name that adheres to the limits related to the queues
@@ -60,27 +60,27 @@ func (j *consumer) listen(ctx context.Context) { //nolint:gocognit
}
}
- j.log.Error("receive message", "error", err)
+ c.log.Error("receive message", "error", err)
continue
}
for i := 0; i < len(message.Messages); i++ {
m := message.Messages[i]
- item, err := j.unpack(&m)
+ item, err := c.unpack(&m)
if err != nil {
- _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.queueURL,
+ _, errD := c.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: c.queueURL,
ReceiptHandle: m.ReceiptHandle,
})
if errD != nil {
- j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ c.log.Error("message unpack, failed to delete the message from the queue", "error", err)
}
- j.log.Error("message unpack", "error", err)
+ c.log.Error("message unpack", "error", err)
continue
}
- j.pq.Insert(item)
+ c.pq.Insert(item)
}
}
}
diff --git a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml
index 87f46069..92d090d4 100644
--- a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml
+++ b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml
@@ -29,3 +29,6 @@ jobs:
reserve_timeout: 10s
consume: [ "test-1" ]
+
+endure:
+ log_level: debug
diff --git a/tests/plugins/jobs/boltdb/.rr-no-global.yaml b/tests/plugins/jobs/boltdb/.rr-no-global.yaml
index 1c09bef9..54aaf3c6 100644
--- a/tests/plugins/jobs/boltdb/.rr-no-global.yaml
+++ b/tests/plugins/jobs/boltdb/.rr-no-global.yaml
@@ -26,15 +26,15 @@ jobs:
pipelines:
test-1:
driver: boltdb
- prefetch: 100
- file: "rr1.db"
- priority: 1
+ prefetch: 100
+ file: "rr1.db"
+ priority: 1
test-2:
driver: boltdb
- prefetch: 100
- file: "rr2.db"
- priority: 1
+ prefetch: 100
+ file: "rr2.db"
+ priority: 1
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
consume: [ "test-1", "test-2" ]
diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go
index 5067ef9f..6c2d05ca 100644
--- a/tests/plugins/jobs/helpers.go
+++ b/tests/plugins/jobs/helpers.go
@@ -95,7 +95,7 @@ func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) {
req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{
Job: "some/php/namespace",
- Id: "1",
+ Id: "2",
Payload: `{"hello":"world"}`,
Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}},
Options: &jobsv1beta.Options{
diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go
index 9f4d37ec..78d154b1 100644
--- a/tests/plugins/jobs/jobs_beanstalk_test.go
+++ b/tests/plugins/jobs/jobs_beanstalk_test.go
@@ -466,7 +466,7 @@ func TestBeanstalkStats(t *testing.T) {
}
func TestBeanstalkNoGlobalSection(t *testing.T) {
- cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
assert.NoError(t, err)
cfg := &config.Viper{
diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go
index cf3e5a91..15d2bce8 100644
--- a/tests/plugins/jobs/jobs_boltdb_test.go
+++ b/tests/plugins/jobs/jobs_boltdb_test.go
@@ -12,6 +12,7 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/boltdb"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
@@ -22,6 +23,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
const (
@@ -222,266 +224,268 @@ func TestBoltDBDeclare(t *testing.T) {
assert.NoError(t, os.Remove(rr1db))
}
-//
-//func TestAMQPJobsError(t *testing.T) {
-// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
-// assert.NoError(t, err)
-//
-// cfg := &config.Viper{
-// Path: "amqp/.rr-amqp-jobs-err.yaml",
-// Prefix: "rr",
-// }
-//
-// controller := gomock.NewController(t)
-// mockLogger := mocks.NewMockLogger(controller)
-//
-// // general
-// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
-// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
-// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
-// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
-//
-// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
-// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
-// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
-//
-// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
-// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
-// mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
-// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
-// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1)
-//
-// err = cont.RegisterAll(
-// cfg,
-// &server.Plugin{},
-// &rpcPlugin.Plugin{},
-// mockLogger,
-// &jobs.Plugin{},
-// &resetter.Plugin{},
-// &informer.Plugin{},
-// &amqp.Plugin{},
-// )
-// assert.NoError(t, err)
-//
-// err = cont.Init()
-// if err != nil {
-// t.Fatal(err)
-// }
-//
-// ch, err := cont.Serve()
-// if err != nil {
-// t.Fatal(err)
-// }
-//
-// sig := make(chan os.Signal, 1)
-// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-//
-// wg := &sync.WaitGroup{}
-// wg.Add(1)
-//
-// stopCh := make(chan struct{}, 1)
-//
-// go func() {
-// defer wg.Done()
-// for {
-// select {
-// case e := <-ch:
-// assert.Fail(t, "error", e.Error.Error())
-// err = cont.Stop()
-// if err != nil {
-// assert.FailNow(t, "error", err.Error())
-// }
-// case <-sig:
-// err = cont.Stop()
-// if err != nil {
-// assert.FailNow(t, "error", err.Error())
-// }
-// return
-// case <-stopCh:
-// // timeout
-// err = cont.Stop()
-// if err != nil {
-// assert.FailNow(t, "error", err.Error())
-// }
-// return
-// }
-// }
-// }()
-//
-// time.Sleep(time.Second * 3)
-//
-// t.Run("DeclareAMQPPipeline", declareAMQPPipe)
-// t.Run("ConsumeAMQPPipeline", resumePipes("test-3"))
-// t.Run("PushAMQPPipeline", pushToPipe("test-3"))
-// time.Sleep(time.Second * 25)
-// t.Run("PauseAMQPPipeline", pausePipelines("test-3"))
-// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3"))
-//
-// time.Sleep(time.Second * 5)
-// stopCh <- struct{}{}
-// wg.Wait()
-//}
-//
-//func TestAMQPNoGlobalSection(t *testing.T) {
-// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
-// assert.NoError(t, err)
-//
-// cfg := &config.Viper{
-// Path: "amqp/.rr-no-global.yaml",
-// Prefix: "rr",
-// }
-//
-// err = cont.RegisterAll(
-// cfg,
-// &server.Plugin{},
-// &rpcPlugin.Plugin{},
-// &logger.ZapLogger{},
-// &jobs.Plugin{},
-// &resetter.Plugin{},
-// &informer.Plugin{},
-// &amqp.Plugin{},
-// )
-// assert.NoError(t, err)
-//
-// err = cont.Init()
-// if err != nil {
-// t.Fatal(err)
-// }
-//
-// _, err = cont.Serve()
-// require.Error(t, err)
-//}
-//
-//func TestAMQPStats(t *testing.T) {
-// cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
-// assert.NoError(t, err)
-//
-// cfg := &config.Viper{
-// Path: "amqp/.rr-amqp-declare.yaml",
-// Prefix: "rr",
-// }
-//
-// controller := gomock.NewController(t)
-// mockLogger := mocks.NewMockLogger(controller)
-//
-// // general
-// mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
-// mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
-// mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
-// mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
-//
-// mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
-// mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
-// mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
-// mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
-// mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
-// mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
-// mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes()
-//
-// err = cont.RegisterAll(
-// cfg,
-// &server.Plugin{},
-// &rpcPlugin.Plugin{},
-// mockLogger,
-// &jobs.Plugin{},
-// &resetter.Plugin{},
-// &informer.Plugin{},
-// &amqp.Plugin{},
-// )
-// assert.NoError(t, err)
-//
-// err = cont.Init()
-// if err != nil {
-// t.Fatal(err)
-// }
-//
-// ch, err := cont.Serve()
-// if err != nil {
-// t.Fatal(err)
-// }
-//
-// sig := make(chan os.Signal, 1)
-// signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-//
-// wg := &sync.WaitGroup{}
-// wg.Add(1)
-//
-// stopCh := make(chan struct{}, 1)
-//
-// go func() {
-// defer wg.Done()
-// for {
-// select {
-// case e := <-ch:
-// assert.Fail(t, "error", e.Error.Error())
-// err = cont.Stop()
-// if err != nil {
-// assert.FailNow(t, "error", err.Error())
-// }
-// case <-sig:
-// err = cont.Stop()
-// if err != nil {
-// assert.FailNow(t, "error", err.Error())
-// }
-// return
-// case <-stopCh:
-// // timeout
-// err = cont.Stop()
-// if err != nil {
-// assert.FailNow(t, "error", err.Error())
-// }
-// return
-// }
-// }
-// }()
-//
-// time.Sleep(time.Second * 3)
-//
-// t.Run("DeclareAMQPPipeline", declareAMQPPipe)
-// t.Run("ConsumeAMQPPipeline", resumePipes("test-3"))
-// t.Run("PushAMQPPipeline", pushToPipe("test-3"))
-// time.Sleep(time.Second * 2)
-// t.Run("PauseAMQPPipeline", pausePipelines("test-3"))
-// time.Sleep(time.Second * 2)
-// t.Run("PushAMQPPipeline", pushToPipe("test-3"))
-// t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5))
-//
-// out := &jobState.State{}
-// t.Run("Stats", stats(out))
-//
-// assert.Equal(t, out.Pipeline, "test-3")
-// assert.Equal(t, out.Driver, "amqp")
-// assert.Equal(t, out.Queue, "default")
-//
-// assert.Equal(t, int64(1), out.Active)
-// assert.Equal(t, int64(1), out.Delayed)
-// assert.Equal(t, int64(0), out.Reserved)
-// assert.Equal(t, false, out.Ready)
-//
-// time.Sleep(time.Second)
-// t.Run("ResumePipeline", resumePipes("test-3"))
-// time.Sleep(time.Second * 7)
-//
-// out = &jobState.State{}
-// t.Run("Stats", stats(out))
-//
-// assert.Equal(t, out.Pipeline, "test-3")
-// assert.Equal(t, out.Driver, "amqp")
-// assert.Equal(t, out.Queue, "default")
-//
-// assert.Equal(t, int64(0), out.Active)
-// assert.Equal(t, int64(0), out.Delayed)
-// assert.Equal(t, int64(0), out.Reserved)
-// assert.Equal(t, true, out.Ready)
-//
-// time.Sleep(time.Second)
-// t.Run("DestroyAMQPPipeline", destroyPipelines("test-3"))
-//
-// time.Sleep(time.Second * 5)
-// stopCh <- struct{}{}
-// wg.Wait()
-//}
-//
+func TestBoltDBJobsError(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "boltdb/.rr-boltdb-jobs-err.yaml",
+ Prefix: "rr",
+ }
+
+ //controller := gomock.NewController(t)
+ //mockLogger := mocks.NewMockLogger(controller)
+ //
+ //// general
+ //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
+ //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ //
+ //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ //
+ //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ //mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3)
+ //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").Times(1)
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ //mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &boltdb.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclarePipeline", declareBoltDBPipe(rr1db))
+ t.Run("ConsumePipeline", resumePipes("test-3"))
+ t.Run("PushPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 25)
+ t.Run("PausePipeline", pausePipelines("test-3"))
+ t.Run("DestroyPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+ assert.NoError(t, os.Remove(rr1db))
+}
+
+func TestBoltDBNoGlobalSection(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "boltdb/.rr-no-global.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &boltdb.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = cont.Serve()
+ require.Error(t, err)
+}
+
+func TestBoltDBStats(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "boltdb/.rr-boltdb-declare.yaml",
+ Prefix: "rr",
+ }
+
+ //controller := gomock.NewController(t)
+ //mockLogger := mocks.NewMockLogger(controller)
+ //
+ //// general
+ //mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ //mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ //mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1)
+ //mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ //
+ //mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ //mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2)
+ //mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "amqp", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ //mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ //mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+ //mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1)
+ //mockLogger.EXPECT().Info("delivery channel closed, leaving the rabbit listener").AnyTimes()
+
+ err = cont.RegisterAll(
+ cfg,
+ &server.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ //mockLogger,
+ &jobs.Plugin{},
+ &resetter.Plugin{},
+ &informer.Plugin{},
+ &boltdb.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 3)
+
+ t.Run("DeclarePipeline", declareBoltDBPipe(rr1db))
+ t.Run("ConsumePipeline", resumePipes("test-3"))
+ t.Run("PushPipeline", pushToPipe("test-3"))
+ time.Sleep(time.Second * 2)
+ t.Run("PausePipeline", pausePipelines("test-3"))
+ time.Sleep(time.Second * 2)
+ t.Run("PushPipeline", pushToPipe("test-3"))
+ t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5))
+
+ out := &jobState.State{}
+ t.Run("Stats", stats(out))
+
+ assert.Equal(t, "test-3", out.Pipeline)
+ assert.Equal(t, "boltdb", out.Driver)
+ assert.Equal(t, "push", out.Queue)
+
+ assert.Equal(t, int64(1), out.Active)
+ assert.Equal(t, int64(1), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+ assert.Equal(t, false, out.Ready)
+
+ time.Sleep(time.Second)
+ t.Run("ResumePipeline", resumePipes("test-3"))
+ time.Sleep(time.Second * 7)
+
+ out = &jobState.State{}
+ t.Run("Stats", stats(out))
+
+ assert.Equal(t, "test-3", out.Pipeline)
+ assert.Equal(t, "boltdb", out.Driver)
+ assert.Equal(t, "push", out.Queue)
+
+ assert.Equal(t, int64(0), out.Active)
+ assert.Equal(t, int64(0), out.Delayed)
+ assert.Equal(t, int64(0), out.Reserved)
+ assert.Equal(t, true, out.Ready)
+
+ time.Sleep(time.Second)
+ t.Run("DestroyPipeline", destroyPipelines("test-3"))
+
+ time.Sleep(time.Second * 5)
+ stopCh <- struct{}{}
+ wg.Wait()
+ assert.NoError(t, os.Remove(rr1db))
+}
func declareBoltDBPipe(file string) func(t *testing.T) {
return func(t *testing.T) {
diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go
index 91855ee9..951d6227 100644
--- a/tests/plugins/jobs/jobs_general_test.go
+++ b/tests/plugins/jobs/jobs_general_test.go
@@ -171,9 +171,12 @@ func TestJOBSMetrics(t *testing.T) {
signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
tt := time.NewTimer(time.Minute * 3)
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
go func() {
defer tt.Stop()
+ defer wg.Done()
for {
select {
case e := <-ch:
@@ -220,7 +223,7 @@ func TestJOBSMetrics(t *testing.T) {
assert.Contains(t, genericOut, "workers_memory_bytes")
close(sig)
- time.Sleep(time.Second * 2)
+ wg.Wait()
}
const getAddr = "http://127.0.0.1:2112/metrics"
diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go
index 95abe9dc..2dd2c8db 100644
--- a/tests/plugins/jobs/jobs_sqs_test.go
+++ b/tests/plugins/jobs/jobs_sqs_test.go
@@ -437,15 +437,15 @@ func TestSQSStat(t *testing.T) {
time.Sleep(time.Second * 3)
- t.Run("DeclareSQSPipeline", declareSQSPipe)
- t.Run("ConsumeSQSPipeline", resumePipes("test-3"))
- t.Run("PushSQSPipeline", pushToPipe("test-3"))
+ t.Run("DeclarePipeline", declareSQSPipe)
+ t.Run("ConsumePipeline", resumePipes("test-3"))
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
- t.Run("PauseSQSPipeline", pausePipelines("test-3"))
+ t.Run("PausePipeline", pausePipelines("test-3"))
time.Sleep(time.Second)
- t.Run("PushSQSPipelineDelayed", pushToPipeDelayed("test-3", 5))
- t.Run("PushSQSPipeline", pushToPipe("test-3"))
+ t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5))
+ t.Run("PushPipeline", pushToPipe("test-3"))
time.Sleep(time.Second)
out := &jobState.State{}
@@ -474,7 +474,7 @@ func TestSQSStat(t *testing.T) {
assert.Equal(t, int64(0), out.Delayed)
assert.Equal(t, int64(0), out.Reserved)
- t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3"))
+ t.Run("DestroyPipeline", destroyPipelines("test-3"))
time.Sleep(time.Second * 5)
stopCh <- struct{}{}