diff options
author | Valery Piashchynski <[email protected]> | 2021-08-31 15:31:30 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-08-31 15:31:30 +0300 |
commit | 83e7bc6afbc2e523a95cf9dcb8b25cf5f7ba3f1e (patch) | |
tree | 884dd2991acf12826752632b8321410e7cc923ce | |
parent | 0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff) | |
parent | 31cf040029eb0b26278e4a9948cbc1aba77ed58b (diff) |
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
-rw-r--r-- | CHANGELOG.md | 5 | ||||
-rw-r--r-- | go.mod | 28 | ||||
-rw-r--r-- | go.sum | 65 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 6 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 50 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 2 | ||||
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 5 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 105 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/config.go (renamed from plugins/jobs/drivers/amqp/config.go) | 2 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go (renamed from plugins/jobs/drivers/amqp/consumer.go) | 166 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/item.go (renamed from plugins/jobs/drivers/amqp/item.go) | 20 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/listener.go (renamed from plugins/jobs/drivers/amqp/listener.go) | 12 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/rabbit_init.go (renamed from plugins/jobs/drivers/amqp/rabbit_init.go) | 18 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/redial.go (renamed from plugins/jobs/drivers/amqp/redial.go) | 74 | ||||
-rw-r--r-- | plugins/amqp/plugin.go (renamed from plugins/jobs/drivers/amqp/plugin.go) | 5 | ||||
-rw-r--r-- | plugins/beanstalk/config.go (renamed from plugins/jobs/drivers/beanstalk/config.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/connection.go (renamed from plugins/jobs/drivers/beanstalk/connection.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/consumer.go (renamed from plugins/jobs/drivers/beanstalk/consumer.go) | 26 | ||||
-rw-r--r-- | plugins/beanstalk/encode_test.go (renamed from plugins/jobs/drivers/beanstalk/encode_test.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/item.go (renamed from plugins/jobs/drivers/beanstalk/item.go) | 2 | ||||
-rw-r--r-- | plugins/beanstalk/listen.go (renamed from plugins/jobs/drivers/beanstalk/listen.go) | 2 | ||||
-rw-r--r-- | plugins/beanstalk/plugin.go (renamed from plugins/jobs/drivers/beanstalk/plugin.go) | 0 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/config.go | 39 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 422 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/item.go | 229 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 151 | ||||
-rw-r--r-- | plugins/boltdb/boltkv/config.go (renamed from plugins/kv/drivers/boltdb/config.go) | 2 | ||||
-rw-r--r-- | plugins/boltdb/boltkv/driver.go (renamed from plugins/kv/drivers/boltdb/driver.go) | 15 | ||||
-rw-r--r-- | plugins/boltdb/doc/boltjobs.drawio | 1 | ||||
-rw-r--r-- | plugins/boltdb/doc/job_lifecycle.md | 10 | ||||
-rw-r--r-- | plugins/boltdb/plugin.go | 82 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 63 | ||||
-rw-r--r-- | plugins/ephemeral/consumer.go (renamed from plugins/jobs/drivers/ephemeral/consumer.go) | 129 | ||||
-rw-r--r-- | plugins/ephemeral/item.go (renamed from plugins/jobs/drivers/ephemeral/item.go) | 0 | ||||
-rw-r--r-- | plugins/ephemeral/plugin.go (renamed from plugins/jobs/drivers/ephemeral/plugin.go) | 0 | ||||
-rw-r--r-- | plugins/jobs/job/job.go (renamed from plugins/jobs/job/general.go) | 33 | ||||
-rw-r--r-- | plugins/jobs/job/job_options.go | 32 | ||||
-rw-r--r-- | plugins/jobs/job/job_test.go (renamed from plugins/jobs/job/job_options_test.go) | 0 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 18 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin.go | 71 | ||||
-rw-r--r-- | plugins/kv/plugin.go | 118 | ||||
-rw-r--r-- | plugins/memcached/config.go (renamed from plugins/kv/drivers/memcached/config.go) | 0 | ||||
-rw-r--r-- | plugins/memcached/driver.go (renamed from plugins/kv/drivers/memcached/driver.go) | 0 | ||||
-rw-r--r-- | plugins/memcached/plugin.go (renamed from plugins/kv/drivers/memcached/plugin.go) | 0 | ||||
-rw-r--r-- | plugins/resetter/plugin.go | 2 | ||||
-rw-r--r-- | plugins/server/plugin.go | 2 | ||||
-rw-r--r-- | plugins/sqs/config.go (renamed from plugins/jobs/drivers/sqs/config.go) | 0 | ||||
-rw-r--r-- | plugins/sqs/consumer.go (renamed from plugins/jobs/drivers/sqs/consumer.go) | 94 | ||||
-rw-r--r-- | plugins/sqs/item.go (renamed from plugins/jobs/drivers/sqs/item.go) | 11 | ||||
-rw-r--r-- | plugins/sqs/listener.go (renamed from plugins/jobs/drivers/sqs/listener.go) | 36 | ||||
-rw-r--r-- | plugins/sqs/plugin.go (renamed from plugins/jobs/drivers/sqs/plugin.go) | 0 | ||||
-rw-r--r-- | plugins/status/plugin.go | 4 | ||||
-rw-r--r-- | tests/allocate-failed.php | 18 | ||||
-rw-r--r-- | tests/plugins/jobs/beanstalk/.rr-no-global.yaml | 3 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml | 24 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml | 43 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml | 24 | ||||
-rw-r--r-- | tests/plugins/jobs/boltdb/.rr-no-global.yaml | 41 | ||||
-rw-r--r-- | tests/plugins/jobs/helpers.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_amqp_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_beanstalk_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_boltdb_test.go | 506 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_ephemeral_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_general_test.go | 9 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_sqs_test.go | 16 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_with_toxics_test.go | 6 | ||||
-rw-r--r-- | tests/plugins/kv/storage_plugin_test.go | 4 |
67 files changed, 2200 insertions, 659 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e29bdb8..93bcf13b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,11 +11,12 @@ v2.4.0 (_.08.2021) ## 👀 New: - ✏️ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. - Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `ephemeral`. [PR](https://github.com/spiral/roadrunner/pull/726) + Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `ephemeral` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726) - Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port`, `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2) ## 🩹 Fixes: -- 🐛 Fix: fixed bug with goroutines waiting on the internal worker's container channel. +- 🐛 Fix: fixed bug with goroutines waiting on the internal worker's container channel, [issue](https://github.com/spiral/roadrunner/issues/750). +- 🐛 Fix: RR become unresponsive when new workers failed to re-allocate, [issue](https://github.com/spiral/roadrunner/issues/772). ## 📈 Summary: @@ -6,11 +6,11 @@ require ( github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/alicebob/miniredis/v2 v2.15.1 // ========= AWS SDK v2 - github.com/aws/aws-sdk-go-v2 v1.8.1 - github.com/aws/aws-sdk-go-v2/config v1.6.1 - github.com/aws/aws-sdk-go-v2/credentials v1.3.3 - github.com/aws/aws-sdk-go-v2/service/sqs v1.7.2 - github.com/aws/smithy-go v1.7.0 + github.com/aws/aws-sdk-go-v2 v1.9.0 + github.com/aws/aws-sdk-go-v2/config v1.7.0 + github.com/aws/aws-sdk-go-v2/credentials v1.4.0 + github.com/aws/aws-sdk-go-v2/service/sqs v1.8.0 + github.com/aws/smithy-go v1.8.0 // ===================== github.com/beanstalkd/go-beanstalk v0.1.0 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b @@ -18,11 +18,11 @@ require ( github.com/fasthttp/websocket v1.4.3 github.com/fatih/color v1.12.0 github.com/go-redis/redis/v8 v8.11.3 - github.com/gofiber/fiber/v2 v2.17.0 + github.com/gofiber/fiber/v2 v2.18.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 github.com/json-iterator/go v1.1.11 - github.com/klauspost/compress v1.13.4 + github.com/klauspost/compress v1.13.5 github.com/prometheus/client_golang v1.11.0 github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891 github.com/shirou/gopsutil v3.21.7+incompatible @@ -38,7 +38,7 @@ require ( go.etcd.io/bbolt v1.3.6 go.uber.org/multierr v1.7.0 go.uber.org/zap v1.19.0 - golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d + golang.org/x/net v0.0.0-20210825183410-e898025ed96a golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf google.golang.org/protobuf v1.27.1 @@ -49,11 +49,11 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/andybalholm/brotli v1.0.3 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.1 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.2.1 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.3 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.3.3 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.6.2 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.4.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.7.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -80,7 +80,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.2.0 // indirect - github.com/tklauser/go-sysconf v0.3.8 // indirect + github.com/tklauser/go-sysconf v0.3.9 // indirect github.com/tklauser/numcpus v0.3.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.29.0 // indirect @@ -61,26 +61,26 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= -github.com/aws/aws-sdk-go-v2 v1.8.1 h1:GcFgQl7MsBygmeeqXyV1ivrTEmsVz/rdFJaTcltG9ag= -github.com/aws/aws-sdk-go-v2 v1.8.1/go.mod h1:xEFuWz+3TYdlPRuo+CqATbeDWIWyaT5uAPwPaWtgse0= -github.com/aws/aws-sdk-go-v2/config v1.6.1 h1:qrZINaORyr78syO1zfD4l7r4tZjy0Z1l0sy4jiysyOM= -github.com/aws/aws-sdk-go-v2/config v1.6.1/go.mod h1:t/y3UPu0XEDy0cEw6mvygaBQaPzWiYAxfP2SzgtvclA= -github.com/aws/aws-sdk-go-v2/credentials v1.3.3 h1:A13QPatmUl41SqUfnuT3V0E3XiNGL6qNTOINbE8cZL4= -github.com/aws/aws-sdk-go-v2/credentials v1.3.3/go.mod h1:oVieKMT3m9BSfqhOfuQ+E0j/yN84ZAJ7Qv8Sfume/ak= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.1 h1:rc+fRGvlKbeSd9IFhFS1KWBs0XjTkq0CfK5xqyLgIp0= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.1/go.mod h1:+GTydg3uHmVlQdkRoetz6VHKbOMEYof70m19IpMLifc= -github.com/aws/aws-sdk-go-v2/internal/ini v1.2.1 h1:IkqRRUZTKaS16P2vpX+FNc2jq3JWa3c478gykQp4ow4= -github.com/aws/aws-sdk-go-v2/internal/ini v1.2.1/go.mod h1:Pv3WenDjI0v2Jl7UaMFIIbPOBbhn33RmmAmGgkXDoqY= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.3 h1:VxFCgxsqWe7OThOwJ5IpFX3xrObtuIH9Hg/NW7oot1Y= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.3/go.mod h1:7gcsONBmFoCcKrAqrm95trrMd2+C/ReYKP7Vfu8yHHA= -github.com/aws/aws-sdk-go-v2/service/sqs v1.7.2 h1:RFCFJzkGKSpVQZiTyMgoW3V7/uFIUFip5t6ljvD+Uo0= -github.com/aws/aws-sdk-go-v2/service/sqs v1.7.2/go.mod h1:TGLWOGp2jII8DZhzRUQXcrsYMvk7fqz8zYdNPq4YQ8Y= -github.com/aws/aws-sdk-go-v2/service/sso v1.3.3 h1:K2gCnGvAASpz+jqP9iyr+F/KNjmTYf8aWOtTQzhmZ5w= -github.com/aws/aws-sdk-go-v2/service/sso v1.3.3/go.mod h1:Jgw5O+SK7MZ2Yi9Yvzb4PggAPYaFSliiQuWR0hNjexk= -github.com/aws/aws-sdk-go-v2/service/sts v1.6.2 h1:l504GWCoQi1Pk68vSUFGLmDIEMzRfVGNgLakDK+Uj58= -github.com/aws/aws-sdk-go-v2/service/sts v1.6.2/go.mod h1:RBhoMJB8yFToaCnbe0jNq5Dcdy0jp6LhHqg55rjClkM= -github.com/aws/smithy-go v1.7.0 h1:+cLHMRrDZvQ4wk+KuQ9yH6eEg6KZEJ9RI2IkDqnygCg= -github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= +github.com/aws/aws-sdk-go-v2 v1.9.0 h1:+S+dSqQCN3MSU5vJRu1HqHrq00cJn6heIMU7X9hcsoo= +github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= +github.com/aws/aws-sdk-go-v2/config v1.7.0 h1:J2cZ7qe+3IpqBEXnHUrFrOjoB9BlsXg7j53vxcl5IVg= +github.com/aws/aws-sdk-go-v2/config v1.7.0/go.mod h1:w9+nMZ7soXCe5nT46Ri354SNhXDQ6v+V5wqDjnZE+GY= +github.com/aws/aws-sdk-go-v2/credentials v1.4.0 h1:kmvesfjY861FzlCU9mvAfe01D9aeXcG2ZuC+k9F2YLM= +github.com/aws/aws-sdk-go-v2/credentials v1.4.0/go.mod h1:dgGR+Qq7Wjcd4AOAW5Rf5Tnv3+x7ed6kETXyS9WCuAY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0 h1:OxTAgH8Y4BXHD6PGCJ8DHx2kaZPCQfSTqmDsdRZFezE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.5.0/go.mod h1:CpNzHK9VEFUCknu50kkB8z58AH2B5DvPP7ea1LHve/Y= +github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2 h1:d95cddM3yTm4qffj3P6EnP+TzX1SSkWaQypXSgT/hpA= +github.com/aws/aws-sdk-go-v2/internal/ini v1.2.2/go.mod h1:BQV0agm+JEhqR+2RT5e1XTFIDcAAV0eW6z2trp+iduw= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.0 h1:VNJ5NLBteVXEwE2F1zEXVmyIH58mZ6kIQGJoC7C+vkg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.0/go.mod h1:R1KK+vY8AfalhG1AOu5e35pOD2SdoPKQCFLTvnxiohk= +github.com/aws/aws-sdk-go-v2/service/sqs v1.8.0 h1:BI05Jbkaqp5IDxiobr3B59mX07lfpLJDv5NwAEx3wSs= +github.com/aws/aws-sdk-go-v2/service/sqs v1.8.0/go.mod h1:BXA1CVaEd9TBOQ8G2ke7lMWdVggAeh35+h2HDO50z7s= +github.com/aws/aws-sdk-go-v2/service/sso v1.4.0 h1:sHXMIKYS6YiLPzmKSvDpPmOpJDHxmAUgbiF49YNVztg= +github.com/aws/aws-sdk-go-v2/service/sso v1.4.0/go.mod h1:+1fpWnL96DL23aXPpMGbsmKe8jLTEfbjuQoA4WS1VaA= +github.com/aws/aws-sdk-go-v2/service/sts v1.7.0 h1:1at4e5P+lvHNl2nUktdM2/v+rpICg/QSEr9TO/uW9vU= +github.com/aws/aws-sdk-go-v2/service/sts v1.7.0/go.mod h1:0qcSMCyASQPN2sk/1KQLQ2Fh6yq8wm0HSDAimPhzCoM= +github.com/aws/smithy-go v1.8.0 h1:AEwwwXQZtUwP5Mz506FeXXrKBe0jA8gVM+1gEcSRooc= +github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/beanstalkd/go-beanstalk v0.1.0 h1:IiNwYbAoVBDs5xEOmleGoX+DRD3Moz99EpATbl8672w= github.com/beanstalkd/go-beanstalk v0.1.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= @@ -96,7 +96,6 @@ github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQ github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -128,8 +127,6 @@ github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fsnotify/fsnotify v1.5.0 h1:NO5hkcB+srp1x6QmwvNZLeaOgbM8cmBTN32THzjvu2k= -github.com/fsnotify/fsnotify v1.5.0/go.mod h1:BX0DCEr5pT4jm2CnQdVP1lFV521fcCNcyEeNp4DQQDk= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -150,8 +147,8 @@ github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/gofiber/fiber/v2 v2.17.0 h1:qP3PkGUbBB0i9iQh5E057XI1yO5CZigUxZhyUFYAFoM= -github.com/gofiber/fiber/v2 v2.17.0/go.mod h1:iftruuHGkRYGEXVISmdD7HTYWyfS2Bh+Dkfq4n/1Owg= +github.com/gofiber/fiber/v2 v2.18.0 h1:xCWYSVoTNibHpzfciPwUSZGiTyTpTXYchCwynuJU09s= +github.com/gofiber/fiber/v2 v2.18.0/go.mod h1:/LdZHMUXZvTTo7gU4+b1hclqCAdoQphNQ9bi9gutPyI= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -268,9 +265,9 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4= +github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= @@ -352,8 +349,6 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/rabbitmq/amqp091-go v0.0.0-20210812094702-b2a427eb7d17 h1:3HQ5TTZU56EjMWPU0K0Nqz1aakOLQUwV4lZ6tNxtXvc= -github.com/rabbitmq/amqp091-go v0.0.0-20210812094702-b2a427eb7d17/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891 h1:13nv5f/LNJxNpvpYm/u0NqrlFebon342f9Xu9GpklKc= github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -401,15 +396,13 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= -github.com/tklauser/go-sysconf v0.3.8 h1:41Nq9J+pxKud4IQ830J5LlS5nl67dVQC7AuisUooaOU= -github.com/tklauser/go-sysconf v0.3.8/go.mod h1:z4zYWRS+X53WUKtBcmDg1comV3fPhdQnzasnIHUoLDU= -github.com/tklauser/numcpus v0.2.3/go.mod h1:vpEPS/JC+oZGGQ/My/vJnNsvMDQL6PwOqt8dsCw5j+E= +github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo= +github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ= github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.14.0/go.mod h1:ol1PCaL0dX20wC0htZ7sYCsvCYmrouYra0zHzaclZhE= -github.com/valyala/fasthttp v1.26.0/go.mod h1:cmWIqlu99AO/RKcp1HWaViTqc57FswJOfYYdPJBl8BA= github.com/valyala/fasthttp v1.29.0 h1:F5GKpytwFk5OhCuRh6H+d4vZAcEeNAwPTdwQnm6IERY= github.com/valyala/fasthttp v1.29.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= @@ -541,8 +534,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRXg1aWdlNOVOHRq45d7c= -golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210825183410-e898025ed96a h1:bRuuGXV8wwSdGTB+CtJf+FjgO1APK1CoO39T4BN/XBw= +golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -629,8 +622,6 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210820121016-41cdb8703e55 h1:rw6UNGRMfarCepjI8qOepea/SXwIBVfTKjztZ5gBbq4= -golang.org/x/sys v0.0.0-20210820121016-41cdb8703e55/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 3eb0714f..7e190846 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -78,7 +78,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout) // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) @@ -329,7 +329,9 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { return nil, errors.E(op, err) } - err = sw.Stop() + // destroy the worker + sw.State().Set(worker.StateDestroyed) + err = sw.Kill() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) return nil, errors.E(op, err) diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index d1b24574..14df513e 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -2,6 +2,7 @@ package pool import ( "context" + "os" "os/exec" "testing" "time" @@ -361,3 +362,52 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { <-block p.Destroy(context.Background()) } + +func TestSupervisedPool_AllocateFailedOK(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(2), + AllocateTimeout: time.Second * 15, + DestroyTimeout: time.Second * 5, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 5 * time.Second, + }, + } + + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + require.NotNil(t, p) + + time.Sleep(time.Second) + + // should be ok + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + require.NoError(t, err) + + // after creating this file, PHP will fail + file, err := os.Create("break") + require.NoError(t, err) + + time.Sleep(time.Second * 5) + assert.NoError(t, file.Close()) + assert.NoError(t, os.Remove("break")) + + defer func() { + if r := recover(); r != nil { + assert.Fail(t, "panic should not be fired!") + } else { + p.Destroy(context.Background()) + } + }() +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 2044d0e7..fa74e7b5 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -194,7 +194,7 @@ func (w *Process) Stop() error { } // Kill kills underlying process, make sure to call Wait() func to gather -// error log from the stderr. Does not waits for process completion! +// error log from the stderr. Does not wait for process completion! func (w *Process) Kill() error { if w.State().Value() == StateDestroyed { err := w.cmd.Process.Signal(os.Kill) diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go index 7fb65a92..5605f1e0 100644 --- a/pkg/worker_watcher/container/channel/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -15,14 +15,11 @@ type Vec struct { destroy uint64 // channel with the workers workers chan worker.BaseProcess - - len uint64 } func NewVector(len uint64) *Vec { vec := &Vec{ destroy: 0, - len: len, workers: make(chan worker.BaseProcess, len), } @@ -48,7 +45,7 @@ func (v *Vec) Push(w worker.BaseProcess) { 1. TTL is set with no requests during the TTL 2. Violated Get <-> Release operation (how ??) */ - for i := uint64(0); i < v.len; i++ { + for i := 0; i < len(v.workers); i++ { /* We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. */ diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 348be199..83f8e627 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -3,12 +3,14 @@ package worker_watcher //nolint:stylecheck import ( "context" "sync" + "sync/atomic" "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel" + "github.com/spiral/roadrunner/v2/utils" ) // Vector interface represents vector container @@ -30,21 +32,24 @@ type workerWatcher struct { sync.RWMutex container Vector // used to control Destroy stage (that all workers are in the container) - numWorkers uint64 + numWorkers *uint64 workers []worker.BaseProcess - allocator worker.Allocator - events events.Handler + allocator worker.Allocator + allocateTimeout time.Duration + events events.Handler } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { ww := &workerWatcher{ - container: channel.NewVector(numWorkers), - numWorkers: numWorkers, + container: channel.NewVector(numWorkers), - workers: make([]worker.BaseProcess, 0, numWorkers), + // pass a ptr to the number of workers to avoid blocking in the TTL loop + numWorkers: utils.Uint64(numWorkers), + allocateTimeout: allocateTimeout, + workers: make([]worker.BaseProcess, 0, numWorkers), allocator: allocator, events: events, @@ -72,11 +77,11 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { // thread safe operation w, err := ww.container.Pop(ctx) - if errors.Is(errors.WatcherStopped, err) { - return nil, errors.E(op, errors.WatcherStopped) - } - if err != nil { + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) + } + return nil, errors.E(op, err) } @@ -92,9 +97,11 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { // try to continuously get free one for { w, err = ww.container.Pop(ctx) - - if errors.Is(errors.WatcherStopped, err) { - return nil, errors.E(op, errors.WatcherStopped) + if err != nil { + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) + } + return nil, errors.E(op, err) } if err != nil { @@ -128,21 +135,63 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { } func (ww *workerWatcher) Allocate() error { - ww.Lock() const op = errors.Op("worker_watcher_allocate_new") + sw, err := ww.allocator() if err != nil { - return errors.E(op, errors.WorkerAllocate, err) + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), + }) + + // if no timeout, return error immediately + if ww.allocateTimeout == 0 { + return errors.E(op, errors.WorkerAllocate, err) + } + + // every half of a second + allocateFreq := time.NewTicker(time.Millisecond * 500) + + tt := time.After(ww.allocateTimeout) + for { + select { + case <-tt: + // reduce number of workers + atomic.AddUint64(ww.numWorkers, ^uint64(0)) + allocateFreq.Stop() + // timeout exceed, worker can't be allocated + return errors.E(op, errors.WorkerAllocate, err) + + case <-allocateFreq.C: + sw, err = ww.allocator() + if err != nil { + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), + }) + continue + } + + // reallocated + allocateFreq.Stop() + goto done + } + } } +done: // add worker to Wait ww.addToWatch(sw) + ww.Lock() // add new worker to the workers slice (to get information about workers in parallel) ww.workers = append(ww.workers, sw) - - // unlock Allocate mutex ww.Unlock() + // push the worker to the container ww.Release(sw) return nil @@ -160,7 +209,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { for i := 0; i < len(ww.workers); i++ { if ww.workers[i].Pid() == pid { ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) - // kill worker + // kill worker, just to be sure it's dead _ = wb.Kill() return } @@ -177,7 +226,7 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) { } } -// Destroy all underlying container (but let them to complete the task) +// Destroy all underlying container (but let them complete the task) func (ww *workerWatcher) Destroy(_ context.Context) { // destroy container, we don't use ww mutex here, since we should be able to push worker ww.Lock() @@ -192,15 +241,12 @@ func (ww *workerWatcher) Destroy(_ context.Context) { case <-tt.C: ww.Lock() // that might be one of the workers is working - if ww.numWorkers != uint64(len(ww.workers)) { + if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { ww.Unlock() continue } - ww.Unlock() - // unnecessary mutex, but - // just to make sure. All container at this moment are in the container + // All container at this moment are in the container // Pop operation is blocked, push can't be done, since it's not possible to pop - ww.Lock() for i := 0; i < len(ww.workers); i++ { ww.workers[i].State().Set(worker.StateDestroyed) // kill the worker @@ -216,6 +262,10 @@ func (ww *workerWatcher) List() []worker.BaseProcess { ww.RLock() defer ww.RUnlock() + if len(ww.workers) == 0 { + return nil + } + base := make([]worker.BaseProcess, 0, len(ww.workers)) for i := 0; i < len(ww.workers); i++ { base = append(base, ww.workers[i]) @@ -253,6 +303,11 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { Event: events.EventPoolError, Payload: errors.E(op, err), }) + + // no workers at all, panic + if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { + panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err))) + } } } diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/amqp/amqpjobs/config.go index 1ec089f1..ac2f6e53 100644 --- a/plugins/jobs/drivers/amqp/config.go +++ b/plugins/amqp/amqpjobs/config.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs // pipeline rabbitmq info const ( diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 95df02ec..784a102c 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs import ( "context" @@ -20,7 +20,11 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -type JobConsumer struct { +const ( + pluginName string = "amqp" +) + +type consumer struct { sync.Mutex log logger.Logger pq priorityqueue.Queue @@ -58,7 +62,7 @@ type JobConsumer struct { } // NewAMQPConsumer initializes rabbitmq pipeline -func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -92,7 +96,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, globalCfg.InitDefault() // PARSE CONFIGURATION END ------- - jb := &JobConsumer{ + jb := &consumer{ log: log, pq: pq, eh: e, @@ -140,7 +144,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_amqp_consumer_from_pipeline") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -163,7 +167,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // PARSE CONFIGURATION ------- - jb := &JobConsumer{ + jb := &consumer{ log: log, eh: e, pq: pq, @@ -214,17 +218,17 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return jb, nil } -func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { +func (c *consumer) Push(ctx context.Context, job *job.Job) error { const op = errors.Op("rabbitmq_push") // check if the pipeline registered // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != job.Options.Pipeline { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) } - err := j.handleItem(ctx, fromJob(job)) + err := c.handleItem(ctx, fromJob(job)) if err != nil { return errors.E(op, err) } @@ -232,38 +236,38 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return nil } -func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { - j.pipeline.Store(p) +func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { + c.pipeline.Store(p) return nil } -func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("rabbit_consume") +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("rabbit_run") - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } // protect connection (redial) - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() var err error - j.consumeChan, err = j.conn.Channel() + c.consumeChan, err = c.conn.Channel() if err != nil { return errors.E(op, err) } - err = j.consumeChan.Qos(j.prefetch, 0, false) + err = c.consumeChan.Qos(c.prefetch, 0, false) if err != nil { return errors.E(op, err) } // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, + deliv, err := c.consumeChan.Consume( + c.queue, + c.consumeID, false, false, false, @@ -275,9 +279,11 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { } // run listener - j.listener(deliv) + c.listener(deliv) - j.eh.Push(events.JobEvent{ + atomic.StoreUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -287,28 +293,28 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { +func (c *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("amqp_driver_state") select { - case pch := <-j.publishChan: + case pch := <-c.publishChan: defer func() { - j.publishChan <- pch + c.publishChan <- pch }() - q, err := pch.QueueInspect(j.queue) + q, err := pch.QueueInspect(c.queue) if err != nil { return nil, errors.E(op, err) } - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) return &jobState.State{ Pipeline: pipe.Name(), Driver: pipe.Driver(), Queue: q.Name, Active: int64(q.Messages), - Delayed: atomic.LoadInt64(j.delayed), - Ready: ready(atomic.LoadUint32(&j.listeners)), + Delayed: atomic.LoadInt64(c.delayed), + Ready: ready(atomic.LoadUint32(&c.listeners)), }, nil case <-ctx.Done(): @@ -316,37 +322,37 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { } } -func (j *JobConsumer) Pause(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested pause on: ", p) + c.log.Error("no such pipeline", "requested pause on: ", p) } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 0 { - j.log.Warn("no active listeners, nothing to pause") + c.log.Warn("no active listeners, nothing to pause") return } - atomic.AddUint32(&j.listeners, ^uint32(0)) + atomic.AddUint32(&c.listeners, ^uint32(0)) // protect connection (redial) - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() - err := j.consumeChan.Cancel(j.consumeID, true) + err := c.consumeChan.Cancel(c.consumeID, true) if err != nil { - j.log.Error("cancel publish channel, forcing close", "error", err) - errCl := j.consumeChan.Close() + c.log.Error("cancel publish channel, forcing close", "error", err) + errCl := c.consumeChan.Close() if errCl != nil { - j.log.Error("force close failed", "error", err) + c.log.Error("force close failed", "error", err) return } return } - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -354,40 +360,40 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested resume on: ", p) + c.log.Error("no such pipeline", "requested resume on: ", p) } // protect connection (redial) - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 1 { - j.log.Warn("amqp listener already in the active state") + c.log.Warn("amqp listener already in the active state") return } var err error - j.consumeChan, err = j.conn.Channel() + c.consumeChan, err = c.conn.Channel() if err != nil { - j.log.Error("create channel on rabbitmq connection", "error", err) + c.log.Error("create channel on rabbitmq connection", "error", err) return } - err = j.consumeChan.Qos(j.prefetch, 0, false) + err = c.consumeChan.Qos(c.prefetch, 0, false) if err != nil { - j.log.Error("qos set failed", "error", err) + c.log.Error("qos set failed", "error", err) return } // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, + deliv, err := c.consumeChan.Consume( + c.queue, + c.consumeID, false, false, false, @@ -395,17 +401,17 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { nil, ) if err != nil { - j.log.Error("consume operation failed", "error", err) + c.log.Error("consume operation failed", "error", err) return } // run listener - j.listener(deliv) + c.listener(deliv) // increase number of listeners - atomic.AddUint32(&j.listeners, 1) + atomic.AddUint32(&c.listeners, 1) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -413,11 +419,13 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { }) } -func (j *JobConsumer) Stop(context.Context) error { - j.stopCh <- struct{}{} +func (c *consumer) Stop(context.Context) error { + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + } - pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -427,13 +435,13 @@ func (j *JobConsumer) Stop(context.Context) error { } // handleItem -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("rabbitmq_handle_item") select { - case pch := <-j.publishChan: + case pch := <-c.publishChan: // return the channel back defer func() { - j.publishChan <- pch + c.publishChan <- pch }() // convert @@ -445,39 +453,39 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("rabbitmq_handle_item") // handle timeouts if msg.Options.DelayDuration() > 0 { - atomic.AddInt64(j.delayed, 1) + atomic.AddInt64(c.delayed, 1) // TODO declare separate method for this if condition // TODO dlx cache channel?? delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, c.exchangeName, c.queue) _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, + dlx: c.exchangeName, + dlxRoutingKey: c.routingKey, dlxTTL: delayMs, dlxExpires: delayMs * 2, }) if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) + atomic.AddInt64(c.delayed, ^int64(0)) return errors.E(op, err) } - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + err = pch.QueueBind(tmpQ, tmpQ, c.exchangeName, false, nil) if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) + atomic.AddInt64(c.delayed, ^int64(0)) return errors.E(op, err) } // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + err = pch.Publish(c.exchangeName, tmpQ, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, - Timestamp: time.Now().UTC(), + Timestamp: time.Now(), DeliveryMode: amqp.Persistent, Body: msg.Body(), }) if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) + atomic.AddInt64(c.delayed, ^int64(0)) return errors.E(op, err) } @@ -485,7 +493,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { } // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + err = pch.Publish(c.exchangeName, c.routingKey, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, Timestamp: time.Now(), diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/amqp/amqpjobs/item.go index 623dcca7..66b70a36 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/amqp/amqpjobs/item.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs import ( "context" @@ -139,9 +139,9 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { } // fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ -func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { +func (c *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { const op = errors.Op("from_delivery_convert") - item, err := j.unpack(d) + item, err := c.unpack(d) if err != nil { return nil, errors.E(op, err) } @@ -156,10 +156,10 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack - item.Options.delayed = j.delayed + item.Options.delayed = c.delayed // requeue func - item.Options.requeueFn = j.handleItem + item.Options.requeueFn = c.handleItem return i, nil } @@ -194,11 +194,11 @@ func pack(id string, j *Item) (amqp.Table, error) { } // unpack restores jobs.Options -func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) { +func (c *consumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ - multipleAsk: j.multipleAck, - requeue: j.requeueOnFail, - requeueFn: j.handleItem, + multipleAsk: c.multipleAck, + requeue: c.requeueOnFail, + requeueFn: c.handleItem, }} if _, ok := d.Headers[job.RRID].(string); !ok { @@ -230,7 +230,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) { if _, ok := d.Headers[job.RRPriority]; !ok { // set pipe's priority - item.Options.Priority = j.priority + item.Options.Priority = c.priority } else { item.Options.Priority = d.Headers[job.RRPriority].(int64) } diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/amqp/amqpjobs/listener.go index 0b1cd2dc..75c61cad 100644 --- a/plugins/jobs/drivers/amqp/listener.go +++ b/plugins/amqp/amqpjobs/listener.go @@ -1,24 +1,24 @@ -package amqp +package amqpjobs import amqp "github.com/rabbitmq/amqp091-go" -func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) { +func (c *consumer) listener(deliv <-chan amqp.Delivery) { go func() { for { //nolint:gosimple select { case msg, ok := <-deliv: if !ok { - j.log.Info("delivery channel closed, leaving the rabbit listener") + c.log.Info("delivery channel closed, leaving the rabbit listener") return } - d, err := j.fromDelivery(msg) + d, err := c.fromDelivery(msg) if err != nil { - j.log.Error("amqp delivery convert", "error", err) + c.log.Error("amqp delivery convert", "error", err) continue } // insert job into the main priority queue - j.pq.Insert(d) + c.pq.Insert(d) } } }() diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go index 56ef10c8..fb5f6911 100644 --- a/plugins/jobs/drivers/amqp/rabbit_init.go +++ b/plugins/amqp/amqpjobs/rabbit_init.go @@ -1,23 +1,23 @@ -package amqp +package amqpjobs import ( "github.com/spiral/errors" ) -func (j *JobConsumer) initRabbitMQ() error { +func (c *consumer) initRabbitMQ() error { const op = errors.Op("jobs_plugin_rmq_init") // Channel opens a unique, concurrent server channel to process the bulk of AMQP // messages. Any error from methods on this receiver will render the receiver // invalid and a new Channel should be opened. - channel, err := j.conn.Channel() + channel, err := c.conn.Channel() if err != nil { return errors.E(op, err) } // declare an exchange (idempotent operation) err = channel.ExchangeDeclare( - j.exchangeName, - j.exchangeType, + c.exchangeName, + c.exchangeType, true, false, false, @@ -30,10 +30,10 @@ func (j *JobConsumer) initRabbitMQ() error { // verify or declare a queue q, err := channel.QueueDeclare( - j.queue, + c.queue, false, false, - j.exclusive, + c.exclusive, false, nil, ) @@ -44,8 +44,8 @@ func (j *JobConsumer) initRabbitMQ() error { // bind queue to the exchange err = channel.QueueBind( q.Name, - j.routingKey, - j.exchangeName, + c.routingKey, + c.exchangeName, false, nil, ) diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/amqp/amqpjobs/redial.go index 8dc18b8f..8d21784f 100644 --- a/plugins/jobs/drivers/amqp/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -1,4 +1,4 @@ -package amqp +package amqpjobs import ( "time" @@ -11,70 +11,70 @@ import ( ) // redialer used to redial to the rabbitmq in case of the connection interrupts -func (j *JobConsumer) redialer() { //nolint:gocognit +func (c *consumer) redialer() { //nolint:gocognit go func() { const op = errors.Op("rabbitmq_redial") for { select { - case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): + case err := <-c.conn.NotifyClose(make(chan *amqp.Error)): if err == nil { return } - j.Lock() + c.Lock() // trash the broken publishing channel - <-j.publishChan + <-c.publishChan - t := time.Now() - pipe := j.pipeline.Load().(*pipeline.Pipeline) + t := time.Now().UTC() + pipe := c.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeError, Pipeline: pipe.Name(), Driver: pipe.Driver(), Error: err, - Start: time.Now(), + Start: time.Now().UTC(), }) expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) - expb.MaxElapsedTime = j.retryTimeout + expb.MaxElapsedTime = c.retryTimeout operation := func() error { - j.log.Warn("rabbitmq reconnecting, caused by", "error", err) + c.log.Warn("rabbitmq reconnecting, caused by", "error", err) var dialErr error - j.conn, dialErr = amqp.Dial(j.connStr) + c.conn, dialErr = amqp.Dial(c.connStr) if dialErr != nil { return errors.E(op, dialErr) } - j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + c.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") // re-init connection - errInit := j.initRabbitMQ() + errInit := c.initRabbitMQ() if errInit != nil { - j.log.Error("rabbitmq dial", "error", errInit) + c.log.Error("rabbitmq dial", "error", errInit) return errInit } // redeclare consume channel var errConnCh error - j.consumeChan, errConnCh = j.conn.Channel() + c.consumeChan, errConnCh = c.conn.Channel() if errConnCh != nil { return errors.E(op, errConnCh) } // redeclare publish channel - pch, errPubCh := j.conn.Channel() + pch, errPubCh := c.conn.Channel() if errPubCh != nil { return errors.E(op, errPubCh) } // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, + deliv, err := c.consumeChan.Consume( + c.queue, + c.consumeID, false, false, false, @@ -86,23 +86,23 @@ func (j *JobConsumer) redialer() { //nolint:gocognit } // put the fresh publishing channel - j.publishChan <- pch + c.publishChan <- pch // restart listener - j.listener(deliv) + c.listener(deliv) - j.log.Info("queues and subscribers redeclared successfully") + c.log.Info("queues and subscribers redeclared successfully") return nil } retryErr := backoff.Retry(operation, expb) if retryErr != nil { - j.Unlock() - j.log.Error("backoff failed", "error", retryErr) + c.Unlock() + c.log.Error("backoff failed", "error", retryErr) return } - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), Driver: pipe.Driver(), @@ -110,27 +110,27 @@ func (j *JobConsumer) redialer() { //nolint:gocognit Elapsed: time.Since(t), }) - j.Unlock() + c.Unlock() - case <-j.stopCh: - if j.publishChan != nil { - pch := <-j.publishChan + case <-c.stopCh: + if c.publishChan != nil { + pch := <-c.publishChan err := pch.Close() if err != nil { - j.log.Error("publish channel close", "error", err) + c.log.Error("publish channel close", "error", err) } } - if j.consumeChan != nil { - err := j.consumeChan.Close() + if c.consumeChan != nil { + err := c.consumeChan.Close() if err != nil { - j.log.Error("consume channel close", "error", err) + c.log.Error("consume channel close", "error", err) } } - if j.conn != nil { - err := j.conn.Close() + if c.conn != nil { + err := c.conn.Close() if err != nil { - j.log.Error("amqp connection close", "error", err) + c.log.Error("amqp connection close", "error", err) } } diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/amqp/plugin.go index 624f4405..c4f5f1da 100644 --- a/plugins/jobs/drivers/amqp/plugin.go +++ b/plugins/amqp/plugin.go @@ -4,6 +4,7 @@ import ( "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/amqp/amqpjobs" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -31,10 +32,10 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq) + return amqpjobs.NewAMQPConsumer(configKey, p.log, p.cfg, e, pq) } // FromPipeline constructs AMQP driver from pipeline func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { - return FromPipeline(pipe, p.log, p.cfg, e, pq) + return amqpjobs.FromPipeline(pipe, p.log, p.cfg, e, pq) } diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/beanstalk/config.go index a8069f5d..a8069f5d 100644 --- a/plugins/jobs/drivers/beanstalk/config.go +++ b/plugins/beanstalk/config.go diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/beanstalk/connection.go index d3241b37..d3241b37 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/beanstalk/connection.go diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/beanstalk/consumer.go index 6323148b..5ef89983 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/beanstalk/consumer.go @@ -19,7 +19,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -type JobConsumer struct { +type consumer struct { log logger.Logger eh events.Handler pq priorityqueue.Queue @@ -43,7 +43,7 @@ type JobConsumer struct { requeueCh chan *Item } -func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_beanstalk_consumer") // PARSE CONFIGURATION ------- @@ -86,7 +86,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config } // initialize job consumer - jc := &JobConsumer{ + jc := &consumer{ pq: pq, log: log, eh: e, @@ -108,7 +108,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config return jc, nil } -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_beanstalk_consumer") // PARSE CONFIGURATION ------- @@ -139,7 +139,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu } // initialize job consumer - jc := &JobConsumer{ + jc := &consumer{ pq: pq, log: log, eh: e, @@ -160,7 +160,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return jc, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("beanstalk_push") // check if the pipeline registered @@ -178,7 +178,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { +func (j *consumer) handleItem(ctx context.Context, item *Item) error { const op = errors.Op("beanstalk_handle_item") bb := new(bytes.Buffer) @@ -215,14 +215,14 @@ func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { return nil } -func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { // register the pipeline j.pipeline.Store(p) return nil } // State https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L514 -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { +func (j *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("beanstalk_state") stat, err := j.pool.Stats(ctx) if err != nil { @@ -258,7 +258,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { return out, nil } -func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") // check if the pipeline registered @@ -282,7 +282,7 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(context.Context) error { +func (j *consumer) Stop(context.Context) error { pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -299,7 +299,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -326,7 +326,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/beanstalk/encode_test.go index e43207eb..e43207eb 100644 --- a/plugins/jobs/drivers/beanstalk/encode_test.go +++ b/plugins/beanstalk/encode_test.go diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/beanstalk/item.go index f1d7ac76..0a6cd560 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/beanstalk/item.go @@ -134,7 +134,7 @@ func (i *Item) pack(b *bytes.Buffer) error { return nil } -func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error { +func (j *consumer) unpack(id uint64, data []byte, out *Item) error { err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) if err != nil { return err diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/beanstalk/listen.go index f1385e70..6bb159ea 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/beanstalk/listen.go @@ -4,7 +4,7 @@ import ( "github.com/beanstalkd/go-beanstalk" ) -func (j *JobConsumer) listen() { +func (j *consumer) listen() { for { select { case <-j.stopCh: diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/beanstalk/plugin.go index 529d1474..529d1474 100644 --- a/plugins/jobs/drivers/beanstalk/plugin.go +++ b/plugins/beanstalk/plugin.go diff --git a/plugins/boltdb/boltjobs/config.go b/plugins/boltdb/boltjobs/config.go new file mode 100644 index 00000000..8cc098c1 --- /dev/null +++ b/plugins/boltdb/boltjobs/config.go @@ -0,0 +1,39 @@ +package boltjobs + +const ( + file string = "file" + priority string = "priority" + prefetch string = "prefetch" +) + +type GlobalCfg struct { + // db file permissions + Permissions int `mapstructure:"permissions"` + // consume timeout +} + +func (c *GlobalCfg) InitDefaults() { + if c.Permissions == 0 { + c.Permissions = 0777 + } +} + +type Config struct { + File string `mapstructure:"file"` + Priority int `mapstructure:"priority"` + Prefetch int `mapstructure:"prefetch"` +} + +func (c *Config) InitDefaults() { + if c.File == "" { + c.File = "rr.db" + } + + if c.Priority == 0 { + c.Priority = 10 + } + + if c.Prefetch == 0 { + c.Prefetch = 1000 + } +} diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go new file mode 100644 index 00000000..ed0eda61 --- /dev/null +++ b/plugins/boltdb/boltjobs/consumer.go @@ -0,0 +1,422 @@ +package boltjobs + +import ( + "bytes" + "context" + "encoding/gob" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" + bolt "go.etcd.io/bbolt" +) + +const ( + PluginName string = "boltdb" + rrDB string = "rr.db" + + PushBucket string = "push" + InQueueBucket string = "processing" + DelayBucket string = "delayed" +) + +type consumer struct { + file string + permissions int + priority int + prefetch int + + db *bolt.DB + + bPool sync.Pool + log logger.Logger + eh events.Handler + pq priorityqueue.Queue + pipeline atomic.Value + cond *sync.Cond + + listeners uint32 + active *uint64 + delayed *uint64 + + stopCh chan struct{} +} + +func NewBoltDBJobs(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("init_boltdb_jobs") + + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(PluginName) { + return nil, errors.E(op, errors.Str("no global boltdb configuration")) + } + + conf := &GlobalCfg{} + err := cfg.UnmarshalKey(PluginName, conf) + if err != nil { + return nil, errors.E(op, err) + } + + localCfg := &Config{} + err = cfg.UnmarshalKey(configKey, localCfg) + if err != nil { + return nil, errors.E(op, err) + } + + localCfg.InitDefaults() + conf.InitDefaults() + + db, err := bolt.Open(localCfg.File, os.FileMode(conf.Permissions), &bolt.Options{ + Timeout: time.Second * 20, + NoGrowSync: false, + NoFreelistSync: false, + ReadOnly: false, + NoSync: false, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + if err != nil { + return errors.E(op, upOp) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + cursor := inQb.Cursor() + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + // get all items, which are in the InQueueBucket and put them into the PushBucket + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + err = pushB.Put(k, v) + if err != nil { + return errors.E(op, err) + } + } + return nil + }) + + if err != nil { + return nil, errors.E(op, err) + } + + return &consumer{ + permissions: conf.Permissions, + file: localCfg.File, + priority: localCfg.Priority, + prefetch: localCfg.Prefetch, + + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, + cond: sync.NewCond(&sync.Mutex{}), + + delayed: utils.Uint64(0), + active: utils.Uint64(0), + + db: db, + log: log, + eh: e, + pq: pq, + stopCh: make(chan struct{}, 2), + }, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { + const op = errors.Op("init_boltdb_jobs") + + // if no global section + if !cfg.Has(PluginName) { + return nil, errors.E(op, errors.Str("no global boltdb configuration")) + } + + conf := &GlobalCfg{} + err := cfg.UnmarshalKey(PluginName, conf) + if err != nil { + return nil, errors.E(op, err) + } + + // add default values + conf.InitDefaults() + + db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(conf.Permissions), &bolt.Options{ + Timeout: time.Second * 20, + NoGrowSync: false, + NoFreelistSync: false, + ReadOnly: false, + NoSync: false, + }) + + if err != nil { + return nil, errors.E(op, err) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + const upOp = errors.Op("boltdb_plugin_update") + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(DelayBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, upOp) + } + + _, err = tx.CreateBucketIfNotExists(utils.AsBytes(InQueueBucket)) + if err != nil { + return errors.E(op, upOp) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + cursor := inQb.Cursor() + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + // get all items, which are in the InQueueBucket and put them into the PushBucket + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + err = pushB.Put(k, v) + if err != nil { + return errors.E(op, err) + } + } + + return nil + }) + + if err != nil { + return nil, errors.E(op, err) + } + + return &consumer{ + file: pipeline.String(file, rrDB), + priority: pipeline.Int(priority, 10), + prefetch: pipeline.Int(prefetch, 100), + permissions: conf.Permissions, + + bPool: sync.Pool{New: func() interface{} { + return new(bytes.Buffer) + }}, + cond: sync.NewCond(&sync.Mutex{}), + + delayed: utils.Uint64(0), + active: utils.Uint64(0), + + db: db, + log: log, + eh: e, + pq: pq, + stopCh: make(chan struct{}, 2), + }, nil +} + +func (c *consumer) Push(_ context.Context, job *job.Job) error { + const op = errors.Op("boltdb_jobs_push") + err := c.db.Update(func(tx *bolt.Tx) error { + item := fromJob(job) + // pool with buffers + buf := c.get() + // encode the job + enc := gob.NewEncoder(buf) + err := enc.Encode(item) + if err != nil { + c.put(buf) + return errors.E(op, err) + } + + value := make([]byte, buf.Len()) + copy(value, buf.Bytes()) + c.put(buf) + + // handle delay + if item.Options.Delay > 0 { + b := tx.Bucket(utils.AsBytes(DelayBucket)) + tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339) + + err = b.Put(utils.AsBytes(tKey), value) + if err != nil { + return errors.E(op, err) + } + + atomic.AddUint64(c.delayed, 1) + + return nil + } + + b := tx.Bucket(utils.AsBytes(PushBucket)) + err = b.Put(utils.AsBytes(item.ID()), value) + if err != nil { + return errors.E(op, err) + } + + // increment active counter + atomic.AddUint64(c.active, 1) + + return nil + }) + + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + c.pipeline.Store(pipeline) + return nil +} + +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("boltdb_run") + + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) + } + + // run listener + go c.listener() + go c.delayedJobsListener() + + // increase number of listeners + atomic.AddUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + + return nil +} + +func (c *consumer) Stop(_ context.Context) error { + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + c.stopCh <- struct{}{} + } + + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil +} + +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested pause on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 0 { + c.log.Warn("no active listeners, nothing to pause") + return + } + + c.stopCh <- struct{}{} + c.stopCh <- struct{}{} + + atomic.AddUint32(&c.listeners, ^uint32(0)) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipePaused, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + c.log.Error("no such pipeline", "requested resume on: ", p) + } + + l := atomic.LoadUint32(&c.listeners) + // no active listeners + if l == 1 { + c.log.Warn("amqp listener already in the active state") + return + } + + // run listener + go c.listener() + go c.delayedJobsListener() + + // increase number of listeners + atomic.AddUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) +} + +func (c *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) + + return &jobState.State{ + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Queue: PushBucket, + Active: int64(atomic.LoadUint64(c.active)), + Delayed: int64(atomic.LoadUint64(c.delayed)), + Ready: toBool(atomic.LoadUint32(&c.listeners)), + }, nil +} + +// Private + +func (c *consumer) get() *bytes.Buffer { + return c.bPool.Get().(*bytes.Buffer) +} + +func (c *consumer) put(b *bytes.Buffer) { + b.Reset() + c.bPool.Put(b) +} + +func toBool(r uint32) bool { + return r > 0 +} diff --git a/plugins/boltdb/boltjobs/item.go b/plugins/boltdb/boltjobs/item.go new file mode 100644 index 00000000..837f8c63 --- /dev/null +++ b/plugins/boltdb/boltjobs/item.go @@ -0,0 +1,229 @@ +package boltjobs + +import ( + "bytes" + "encoding/gob" + "sync/atomic" + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" + "go.etcd.io/bbolt" +) + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // private + db *bbolt.DB + active *uint64 + delayed *uint64 +} + +func (i *Item) ID() string { + return i.Ident +} + +func (i *Item) Priority() int64 { + return i.Options.Priority +} + +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) +} + +func (i *Item) Context() ([]byte, error) { + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil +} + +func (i *Item) Ack() error { + const op = errors.Op("boltdb_item_ack") + tx, err := i.Options.db.Begin(true) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + if i.Options.Delay > 0 { + atomic.AddUint64(i.Options.delayed, ^uint64(0)) + } else { + atomic.AddUint64(i.Options.active, ^uint64(0)) + } + + return tx.Commit() +} + +func (i *Item) Nack() error { + const op = errors.Op("boltdb_item_ack") + /* + steps: + 1. begin tx + 2. get item by ID from the InQueueBucket (previously put in the listener) + 3. put it back to the PushBucket + 4. Delete it from the InQueueBucket + */ + tx, err := i.Options.db.Begin(true) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + v := inQb.Get(utils.AsBytes(i.ID())) + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + + err = pushB.Put(utils.AsBytes(i.ID()), v) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + _ = tx.Rollback() + return errors.E(op, err) + } + + return tx.Commit() +} + +/* +Requeue algorithm: +1. Rewrite item headers and delay. +2. Begin writable transaction on attached to the item db. +3. Delete item from the InQueueBucket +4. Handle items with the delay: + 4.1. Get DelayBucket + 4.2. Make a key by adding the delay to the time.Now() in RFC3339 format + 4.3. Put this key with value to the DelayBucket +5. W/o delay, put the key with value to the PushBucket (requeue) +*/ +func (i *Item) Requeue(headers map[string][]string, delay int64) error { + const op = errors.Op("boltdb_item_requeue") + i.Headers = headers + i.Options.Delay = delay + + tx, err := i.Options.db.Begin(true) + if err != nil { + return errors.E(op, err) + } + + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + err = inQb.Delete(utils.AsBytes(i.ID())) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + // encode the item + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err = enc.Encode(i) + val := make([]byte, buf.Len()) + copy(val, buf.Bytes()) + buf.Reset() + + if delay > 0 { + delayB := tx.Bucket(utils.AsBytes(DelayBucket)) + tKey := time.Now().UTC().Add(time.Second * time.Duration(delay)).Format(time.RFC3339) + + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + err = delayB.Put(utils.AsBytes(tKey), val) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + return tx.Commit() + } + + pushB := tx.Bucket(utils.AsBytes(PushBucket)) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + err = pushB.Put(utils.AsBytes(i.ID()), val) + if err != nil { + return errors.E(op, i.rollback(err, tx)) + } + + return tx.Commit() +} + +func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) { + i.Options.db = db + i.Options.active = active + i.Options.delayed = delayed +} + +func (i *Item) rollback(err error, tx *bbolt.Tx) error { + errR := tx.Rollback() + if errR != nil { + return errors.Errorf("transaction commit error: %v, rollback failed: %v", err, errR) + } + return errors.Errorf("transaction commit error: %v", err) +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Headers: job.Headers, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + }, + } +} diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go new file mode 100644 index 00000000..7c161555 --- /dev/null +++ b/plugins/boltdb/boltjobs/listener.go @@ -0,0 +1,151 @@ +package boltjobs + +import ( + "bytes" + "encoding/gob" + "time" + + "github.com/spiral/roadrunner/v2/utils" + bolt "go.etcd.io/bbolt" +) + +func (c *consumer) listener() { + tt := time.NewTicker(time.Millisecond) + defer tt.Stop() + for { + select { + case <-c.stopCh: + c.log.Info("boltdb listener stopped") + return + case <-tt.C: + tx, err := c.db.Begin(true) + if err != nil { + c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) + continue + } + + b := tx.Bucket(utils.AsBytes(PushBucket)) + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + + // get first item + k, v := b.Cursor().First() + if k == nil && v == nil { + _ = tx.Commit() + continue + } + + buf := bytes.NewReader(v) + dec := gob.NewDecoder(buf) + + item := &Item{} + err = dec.Decode(item) + if err != nil { + c.rollback(err, tx) + continue + } + + err = inQb.Put(utils.AsBytes(item.ID()), v) + if err != nil { + c.rollback(err, tx) + continue + } + + // delete key from the PushBucket + err = b.Delete(k) + if err != nil { + c.rollback(err, tx) + continue + } + + err = tx.Commit() + if err != nil { + c.rollback(err, tx) + continue + } + + // attach pointer to the DB + item.attachDB(c.db, c.active, c.delayed) + // as the last step, after commit, put the item into the PQ + c.pq.Insert(item) + } + } +} + +func (c *consumer) delayedJobsListener() { + tt := time.NewTicker(time.Second) + defer tt.Stop() + + // just some 90's + loc, err := time.LoadLocation("UTC") + if err != nil { + c.log.Error("failed to load location, delayed jobs won't work", "error", err) + return + } + + var startDate = utils.AsBytes(time.Date(1990, 1, 1, 0, 0, 0, 0, loc).Format(time.RFC3339)) + + for { + select { + case <-c.stopCh: + c.log.Info("boltdb listener stopped") + return + case <-tt.C: + tx, err := c.db.Begin(true) + if err != nil { + c.log.Error("failed to begin writable transaction, job will be read on the next attempt", "error", err) + continue + } + + delayB := tx.Bucket(utils.AsBytes(DelayBucket)) + inQb := tx.Bucket(utils.AsBytes(InQueueBucket)) + + cursor := delayB.Cursor() + endDate := utils.AsBytes(time.Now().UTC().Format(time.RFC3339)) + + for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() { + buf := bytes.NewReader(v) + dec := gob.NewDecoder(buf) + + item := &Item{} + err = dec.Decode(item) + if err != nil { + c.rollback(err, tx) + continue + } + + err = inQb.Put(utils.AsBytes(item.ID()), v) + if err != nil { + c.rollback(err, tx) + continue + } + + // delete key from the PushBucket + err = delayB.Delete(k) + if err != nil { + c.rollback(err, tx) + continue + } + + // attach pointer to the DB + item.attachDB(c.db, c.active, c.delayed) + // as the last step, after commit, put the item into the PQ + c.pq.Insert(item) + } + + err = tx.Commit() + if err != nil { + c.rollback(err, tx) + continue + } + } + } +} + +func (c *consumer) rollback(err error, tx *bolt.Tx) { + errR := tx.Rollback() + if errR != nil { + c.log.Error("transaction commit error, rollback failed", "error", err, "rollback error", errR) + } + + c.log.Error("transaction commit error, rollback succeed", "error", err) +} diff --git a/plugins/kv/drivers/boltdb/config.go b/plugins/boltdb/boltkv/config.go index 0beb209b..56d00674 100644 --- a/plugins/kv/drivers/boltdb/config.go +++ b/plugins/boltdb/boltkv/config.go @@ -1,4 +1,4 @@ -package boltdb +package boltkv type Config struct { // File is boltDB file. No need to create it by your own, diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/boltdb/boltkv/driver.go index 15a5674f..ba1450cd 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/boltdb/boltkv/driver.go @@ -1,4 +1,4 @@ -package boltdb +package boltkv import ( "bytes" @@ -16,6 +16,10 @@ import ( bolt "go.etcd.io/bbolt" ) +const ( + RootPluginName string = "kv" +) + type Driver struct { clearMu sync.RWMutex // db instance @@ -24,7 +28,8 @@ type Driver struct { bucket []byte log logger.Logger cfg *Config - // gc contains key which are contain timeouts + + // gc contains keys with timeouts gc sync.Map // default timeout for cache cleanup is 1 minute timeout time.Duration @@ -36,6 +41,10 @@ type Driver struct { func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stop chan struct{}) (*Driver, error) { const op = errors.Op("new_boltdb_driver") + if !cfgPlugin.Has(RootPluginName) { + return nil, errors.E(op, errors.Str("no kv section in the configuration")) + } + d := &Driver{ log: log, stop: stop, @@ -157,7 +166,7 @@ func (d *Driver) Get(key string) ([]byte, error) { } // set the value - val = []byte(i) + val = utils.AsBytes(i) } return nil }) diff --git a/plugins/boltdb/doc/boltjobs.drawio b/plugins/boltdb/doc/boltjobs.drawio new file mode 100644 index 00000000..7d1f3531 --- /dev/null +++ b/plugins/boltdb/doc/boltjobs.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-08-31T09:34:11.357Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.164 Electron/13.2.3 Safari/537.36" etag="KiNZAPNeIcd5kV3EE5lF" version="14.6.13" type="device"><diagram id="NuJwivb--D1hymDgb9NQ" name="Page-1">7V1bc5s4GP01nmkfkgHE9TF2km13up20me1uH2WQbRJsuYATe3/9ijtICiHmIrvBnWmNEAjrfNejT3QCZuv9Hz7crv7CDvImiuTsJ+B6oigKABr5J2o5JC0yUJWkZem7TtpWNNy7/6G0UUpbd66DgkrHEGMvdLfVRhtvNsgOK23Q9/FztdsCe9VRt3CJmIZ7G3ps6z+uE67SVl2SihOfkLtcZUMr2Zk1zHqnDcEKOvi51ARuJmDmYxwm39b7GfKi6csmJrnu9oWz+ZP5aBM2ucDcf0ZP8pX6w3iw/K8/fijw1/VFikYQHrJfjBwyAekh9sMVXuIN9G6K1qmPdxsHRXeVyFHR5wvGW9Iok8YHFIaHFE24CzFpWoVrLz1LHtg//Btdf6llhz/T28UH1/vK0SE9WuBNeAvXrhc1zGBgQ8eF5OlnRO7S0+mYMiDHyU+Lfs+LM5aJFPSXKKyZJjnHi4g6wmtEHotc5yMPhu5T9f4wFbll3i+/9A67ZGRFSvVDV1PROORiVb1F8lzpVWVoX7uRSt0owDvfRsyNyJfS7ymaYsl5gxRl2jqwGO3dsCRF5Ohn6UwhQ9HBoSxQpyR6CTKviV4DEVVbimhFJt4sAMl9n6C3S0e6+/v+E2n5fjdjRKMK/PPKDdH9FsaT8EzcSBXkN078wvW8GfawHw8F5tA2nag9CH38iEpnFKCqmpND9YT8EO3rwWInN71AM6vqp2R6/FxyGZkbWJW8Ba2mneFhiNRHuaSNhW7y9VGcXoGGeqWJ1CvzXHAs7GrFqhZG9uTsalP8hdpVa8RfNP66SPwB41enJPm5nrb0qVUnuSAf0+Q5SaADCzjHwtTKp+pVl6qZGutSZY5LBX25VFlmJv1EdVGcTrW1lfzsxqDCK7VhmtRVdqOywe0uWJGW6c5+RGG3uqhFf3i6qMefFL+yjsYfITqqaBQwoGHcSwPYmZIqQh1mNRGVapX0pUQ050dqHGZNwmM7aG7OubacfG5vX5eTDs2Bfg4hlgxOR2Qachfy5ExiLFk5hyBL1hgDP8PbaPwHPOcKxxc4R14VUOi5yw35bpNJQkTlppFldW3oXaUn1q7jJLKDAvc/OI/vF8GzjfxW/Ju06US75ul4E43lWPKUFE/HmuRMdBnKGp140e4TkbOUdNJa+vYLWbIqPuRCoXwDXiwC1A9rqYvVfKO56tMS0RONKUL1ZaOXoJEkB1QGMXDUmE0AGzZ6bhCiDTESivRBltbBx24jyAXSbZsXATiGNZeOdgitIkUmhOdFiuqQkaLMRvXyZTwHheWXiO6Sv1aI/P15822HdigL+XWPPOB07ldw03/tosW8eAIvgngGr0gHGWz38TRm58m3ZfSvEg14jTwURiMsfLzOh4tE5YX0gqAQ8n2PhxZhO88TmasghZ4RoQ3eIDHSQ5EBwGSlJ5eUQfh1WSgx+5ag8Qw9h9HUc1jDeA59aM9hsJ7j010k7th/RH4QzSvGXkuvQckFE79SbgXJjoYMnluxdANAvamItDIEskQtdGuG6JU2RSgtKF0qw6x9vzkd6cYUWA1NQamgRkACmeVCp+8NfmcReCFnHWihRjCL9IZcsisZYByBFH9OVDrEcowW49G/o2CLNwEa1os7EJkLbnKo2yaaL8R4cV28FxdaMCOVdLf5QnvLdYPBdDQD8rRJ4OwpSyr6FdqPw6rnwrQRn7uZm5qqNbatrdTTtKraaUrCtZOl0m58n0xNc2qkC1q+KTnSBXXfHC4rq4xI4TI41JrGQYvm2bsLhQRnREdFQsq5GNOhypbiS698Hx5KHVKdaE6dWDq1J4G6QDFr+5MvySN0yq8obFXV1dCW3tGQ6ag8S28qc6ALolPEm3ogOJU+jk4xWlmPU0ikFLWhXWmdZh9nWHJ3lUmqVW9YZBnUXtCPZQGCV5TbkwC/t+y2ZQmPlF2qTM6iN+pRF1j1/XsSXbEZ8HslsBrLLhBKYCmcGlgfh9h+n8tQ8smlyOrZVKe32SlyatqrNVReVejyFFveeE3GjcbvoYC9PcM8QKF642Xj3sqPsui0Uh0WxQDzmHQsVx4dVQr0+/JdJDyqYsmxvQpvZ1B/hBdLT+ZFXX/GdWSV0q5KJdll3seN6jocMquXI8xlFc2CYJWFGRiDwnxCe0veT4AMGgfIpkgfq57hAmI1/jJaB2C3t5Jknqh0iKHDKXrVMl8hraj+hqZRgtlH4s+mdt/Rr8hDtYwL37r2CR0D8tc+VV2T1GEyO5pntIS/yiN7YdY5WZauCXERlsVsaFmEvh8ke0o2txtVV5Y04bqritTd41gZSndbx4wCdDfDs/eNx0dFBQZd/SbVRwV0f1Wu7w/q+/cTRagsn5GRTD1vd2tdMtUD36Q14ZsG3e6mM/B83pDjb0mcJ817IAJbVzgMQAQackMX0d8bK7gePCGPHl4hjxi83iVRRJUQ8dZilEGJIpUl3JOtpSsUc7tOahh9uFlGmH6A8T0J2kH82OEqsecJ4nGvj+Udp6XNowlnjPZb1ydqmt+jLXs83EbSYWVFpdbtdB6paA0aIdao/6j4tWAa1EKAxdsVzHOy/Sk+u80jUdDEkEeaHan7V/w8iZ5hmtkCot3SIzpUFTcOn0a4ixfCqVW4uekdV3l7w1tjg152Gf7ssKLeLxblbzHn34WvpsqDAcf+8qKv/hBkX1zM0m0jgiUEqWU5VTiCCoMguxVzRLBA0KJiII2TAg2LIFuOz9JmI4IFgnL+4tMsjOVs+RkWQnYNia3jHyEsQUjvvODt2hoWQjZpHSGsh5AiHgzOUuGwELIk3whhvSEFp6aF7Kt/2C0lI4SleJR6NRiX/RsUQZbQYUuDRgQLBOktoDonrR8WQZbFGXWwVgfp/Uqic8LMJowINq6doBdRLNEQssTMaEbrIKTfsMplwwdFkCVm2JdmjggWCOpKA4K7IwjJYfEfTyY1IMV/4Alu/gc=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/boltdb/doc/job_lifecycle.md b/plugins/boltdb/doc/job_lifecycle.md new file mode 100644 index 00000000..317aec90 --- /dev/null +++ b/plugins/boltdb/doc/job_lifecycle.md @@ -0,0 +1,10 @@ +### Job lifecycle + +There are several boltdb buckets: + +1. `PushBucket` - used for pushed jobs via RPC. +2. `InQueueBucket` - when the job consumed from the `PushBucket`, in the same transaction, it copied into the priority queue and +get into the `InQueueBucket` waiting to acknowledgement. +3. `DelayBucket` - used for delayed jobs. RFC3339 used as a timestamp to track delay expiration. + +`` diff --git a/plugins/boltdb/plugin.go b/plugins/boltdb/plugin.go new file mode 100644 index 00000000..683b26f1 --- /dev/null +++ b/plugins/boltdb/plugin.go @@ -0,0 +1,82 @@ +package boltdb + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/boltdb/boltjobs" + "github.com/spiral/roadrunner/v2/plugins/boltdb/boltkv" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "boltdb" +) + +// Plugin BoltDB K/V storage. +type Plugin struct { + cfgPlugin config.Configurer + // logger + log logger.Logger + // stop is used to stop keys GC and close boltdb connection + stop chan struct{} + + drivers uint +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.stop = make(chan struct{}) + p.log = log + p.cfgPlugin = cfg + return nil +} + +// Serve is noop here +func (p *Plugin) Serve() chan error { + return make(chan error, 1) +} + +func (p *Plugin) Stop() error { + if p.drivers > 0 { + for i := uint(0); i < p.drivers; i++ { + // send close signal to every driver + p.stop <- struct{}{} + } + } + return nil +} + +// Name returns plugin name +func (p *Plugin) Name() string { + return PluginName +} + +// Available interface implementation +func (p *Plugin) Available() {} + +func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { + const op = errors.Op("boltdb_plugin_provide") + st, err := boltkv.NewBoltDBDriver(p.log, key, p.cfgPlugin, p.stop) + if err != nil { + return nil, errors.E(op, err) + } + + // save driver number to release resources after Stop + p.drivers++ + + return st, nil +} + +// JOBS bbolt implementation + +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { + return boltjobs.NewBoltDBJobs(configKey, p.log, p.cfgPlugin, e, queue) +} + +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, queue priorityqueue.Queue) (jobs.Consumer, error) { + return boltjobs.FromPipeline(pipe, p.log, p.cfgPlugin, e, queue) +} diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 889dc2fa..a2390df5 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - "github.com/google/uuid" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" @@ -16,9 +15,6 @@ const ( PluginName string = "broadcast" // driver is the mandatory field which should present in every storage driver string = "driver" - - redis string = "redis" - memory string = "memory" ) type Plugin struct { @@ -97,6 +93,7 @@ func (p *Plugin) Publish(m *pubsub.Message) error { } func (p *Plugin) PublishAsync(m *pubsub.Message) { + // TODO(rustatian) channel here? go func() { p.Lock() defer p.Unlock() @@ -106,7 +103,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { err := p.publishers[j].Publish(m) if err != nil { p.log.Error("publishAsync", "error", err) - // continue publish to other registered publishers + // continue publishing to the other registered publishers continue } } @@ -116,7 +113,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { }() } -func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit +func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { const op = errors.Op("broadcast_plugin_get_driver") // choose a driver @@ -136,57 +133,37 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:goco // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, key) - switch val.(map[string]interface{})[driver] { - case memory: - if _, ok := p.constructors[memory]; !ok { - return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers)) - } - ps, err := p.constructors[memory].PSConstruct(configKey) - if err != nil { - return nil, errors.E(op, err) - } - - // save the initialized publisher channel - // for the in-memory, register new publishers - p.publishers[uuid.NewString()] = ps + drName := val.(map[string]interface{})[driver] - return ps, nil - case redis: - if _, ok := p.constructors[redis]; !ok { - return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers)) + // driver name should be a string + if drStr, ok := drName.(string); ok { + if _, ok := p.constructors[drStr]; !ok { + return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr)) } - // first - try local configuration - switch { - case p.cfgPlugin.Has(configKey): - ps, err := p.constructors[redis].PSConstruct(configKey) + // try local config first + if p.cfgPlugin.Has(configKey) { + ps, err := p.constructors[drStr].PSConstruct(configKey) if err != nil { return nil, errors.E(op, err) } - // if section already exists, return new connection - if _, ok := p.publishers[configKey]; ok { - return ps, nil - } - - // if not - initialize a connection + // save the initialized publisher channel + // for the in-memory, register new publishers p.publishers[configKey] = ps - return ps, nil - // then try global if local does not exist - case p.cfgPlugin.Has(redis): - ps, err := p.constructors[redis].PSConstruct(configKey) + return ps, nil + } else { + // try global driver section + ps, err := p.constructors[drStr].PSConstruct(drStr) if err != nil { return nil, errors.E(op, err) } - // if section already exists, return new connection - if _, ok := p.publishers[configKey]; ok { - return ps, nil - } - - // if not - initialize a connection + // save the initialized publisher channel + // for the in-memory, register new publishers p.publishers[configKey] = ps + return ps, nil } } diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/ephemeral/consumer.go index f0992cd6..8870bb0f 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/ephemeral/consumer.go @@ -25,7 +25,7 @@ type Config struct { Prefetch uint64 `mapstructure:"prefetch"` } -type JobConsumer struct { +type consumer struct { cfg *Config log logger.Logger eh events.Handler @@ -43,10 +43,10 @@ type JobConsumer struct { stopCh chan struct{} } -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_ephemeral_pipeline") - jb := &JobConsumer{ + jb := &consumer{ log: log, pq: pq, eh: eh, @@ -71,8 +71,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { - jb := &JobConsumer{ +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + jb := &consumer{ log: log, pq: pq, eh: eh, @@ -88,16 +88,16 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand return jb, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (c *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - _, ok := j.pipeline.Load().(*pipeline.Pipeline) + _, ok := c.pipeline.Load().(*pipeline.Pipeline) if !ok { return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) } - err := j.handleItem(ctx, fromJob(jb)) + err := c.handleItem(ctx, fromJob(jb)) if err != nil { return errors.E(op, err) } @@ -105,42 +105,42 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) State(_ context.Context) (*jobState.State, error) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) return &jobState.State{ Pipeline: pipe.Name(), Driver: pipe.Driver(), Queue: pipe.Name(), - Active: atomic.LoadInt64(j.active), - Delayed: atomic.LoadInt64(j.delayed), - Ready: ready(atomic.LoadUint32(&j.listeners)), + Active: atomic.LoadInt64(c.active), + Delayed: atomic.LoadInt64(c.delayed), + Ready: ready(atomic.LoadUint32(&c.listeners)), }, nil } -func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { - j.pipeline.Store(pipeline) +func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { + c.pipeline.Store(pipeline) return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested pause on: ", p) + c.log.Error("no such pipeline", "requested pause on: ", p) } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 0 { - j.log.Warn("no active listeners, nothing to pause") + c.log.Warn("no active listeners, nothing to pause") return } - atomic.AddUint32(&j.listeners, ^uint32(0)) + atomic.AddUint32(&c.listeners, ^uint32(0)) // stop the consumer - j.stopCh <- struct{}{} + c.stopCh <- struct{}{} - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -149,24 +149,24 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested resume on: ", p) + c.log.Error("no such pipeline", "requested resume on: ", p) } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // listener already active if l == 1 { - j.log.Warn("listener already in the active state") + c.log.Warn("listener already in the active state") return } // resume the consumer on the same channel - j.consume() + c.consume() - atomic.StoreUint32(&j.listeners, 1) - j.eh.Push(events.JobEvent{ + atomic.StoreUint32(&c.listeners, 1) + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), Start: time.Now(), @@ -175,8 +175,8 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { } // Run is no-op for the ephemeral -func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { - j.eh.Push(events.JobEvent{ +func (c *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -185,84 +185,79 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(ctx context.Context) error { - const op = errors.Op("ephemeral_plugin_stop") +func (c *consumer) Stop(_ context.Context) error { + pipe := c.pipeline.Load().(*pipeline.Pipeline) - pipe := j.pipeline.Load().(*pipeline.Pipeline) - - select { - // return from the consumer - case j.stopCh <- struct{}{}: - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Pipeline: pipe.Name(), - Start: time.Now(), - Elapsed: 0, - }) + if atomic.LoadUint32(&c.listeners) > 0 { + c.stopCh <- struct{}{} + } - return nil + c.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) - case <-ctx.Done(): - return errors.E(op, ctx.Err()) - } + return nil } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("ephemeral_handle_request") // handle timeouts // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) // goroutines here. We should limit goroutines here. if msg.Options.Delay > 0 { // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&j.goroutines) >= goroutinesMax { + if atomic.LoadUint64(&c.goroutines) >= goroutinesMax { return errors.E(op, errors.Str("max concurrency number reached")) } go func(jj *Item) { - atomic.AddUint64(&j.goroutines, 1) - atomic.AddInt64(j.delayed, 1) + atomic.AddUint64(&c.goroutines, 1) + atomic.AddInt64(c.delayed, 1) time.Sleep(jj.Options.DelayDuration()) // send the item after timeout expired - j.localPrefetch <- jj + c.localPrefetch <- jj - atomic.AddUint64(&j.goroutines, ^uint64(0)) + atomic.AddUint64(&c.goroutines, ^uint64(0)) }(msg) return nil } // increase number of the active jobs - atomic.AddInt64(j.active, 1) + atomic.AddInt64(c.active, 1) // insert to the local, limited pipeline select { - case j.localPrefetch <- msg: + case c.localPrefetch <- msg: return nil case <-ctx.Done(): - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", c.cfg.Prefetch, ctx.Err())) } } -func (j *JobConsumer) consume() { +func (c *consumer) consume() { go func() { // redirect for { select { - case item, ok := <-j.localPrefetch: + case item, ok := <-c.localPrefetch: if !ok { - j.log.Warn("ephemeral local prefetch queue was closed") + c.log.Warn("ephemeral local prefetch queue was closed") return } // set requeue channel - item.Options.requeueFn = j.handleItem - item.Options.active = j.active - item.Options.delayed = j.delayed + item.Options.requeueFn = c.handleItem + item.Options.active = c.active + item.Options.delayed = c.delayed - j.pq.Insert(item) - case <-j.stopCh: + c.pq.Insert(item) + case <-c.stopCh: return } } diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/ephemeral/item.go index 3298424d..3298424d 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/ephemeral/item.go diff --git a/plugins/jobs/drivers/ephemeral/plugin.go b/plugins/ephemeral/plugin.go index 28495abb..28495abb 100644 --- a/plugins/jobs/drivers/ephemeral/plugin.go +++ b/plugins/ephemeral/plugin.go diff --git a/plugins/jobs/job/general.go b/plugins/jobs/job/job.go index 390f44b5..06c3254e 100644 --- a/plugins/jobs/job/general.go +++ b/plugins/jobs/job/job.go @@ -1,5 +1,9 @@ package job +import ( + "time" +) + // constant keys to pack/unpack messages from different drivers const ( RRID string = "rr_id" @@ -27,3 +31,32 @@ type Job struct { // Options contains set of PipelineOptions specific to job execution. Can be empty. Options *Options `json:"options,omitempty"` } + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` +} + +// Merge merges job options. +func (o *Options) Merge(from *Options) { + if o.Pipeline == "" { + o.Pipeline = from.Pipeline + } + + if o.Delay == 0 { + o.Delay = from.Delay + } +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} diff --git a/plugins/jobs/job/job_options.go b/plugins/jobs/job/job_options.go deleted file mode 100644 index b7e4ed36..00000000 --- a/plugins/jobs/job/job_options.go +++ /dev/null @@ -1,32 +0,0 @@ -package job - -import "time" - -// Options carry information about how to handle given job. -type Options struct { - // Priority is job priority, default - 10 - // pointer to distinguish 0 as a priority and nil as priority not set - Priority int64 `json:"priority"` - - // Pipeline manually specified pipeline. - Pipeline string `json:"pipeline,omitempty"` - - // Delay defines time duration to delay execution for. Defaults to none. - Delay int64 `json:"delay,omitempty"` -} - -// Merge merges job options. -func (o *Options) Merge(from *Options) { - if o.Pipeline == "" { - o.Pipeline = from.Pipeline - } - - if o.Delay == 0 { - o.Delay = from.Delay - } -} - -// DelayDuration returns delay duration in a form of time.Duration. -func (o *Options) DelayDuration() time.Duration { - return time.Second * time.Duration(o.Delay) -} diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_test.go index a47151a3..a47151a3 100644 --- a/plugins/jobs/job/job_options_test.go +++ b/plugins/jobs/job/job_test.go diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 5e62c5c5..3f3fa196 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -177,8 +177,13 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit return true }) + // do not continue processing, immediately stop if channel contains an error + if len(errCh) > 0 { + return errCh + } + var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"}) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}) if err != nil { errCh <- err return errCh @@ -219,6 +224,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit if err != nil { p.events.Push(events.JobEvent{ Event: events.EventJobError, + Error: err, ID: jb.ID(), Start: start, Elapsed: time.Since(start), @@ -243,6 +249,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.events.Push(events.JobEvent{ Event: events.EventJobError, ID: jb.ID(), + Error: err, Start: start, Elapsed: time.Since(start), }) @@ -266,6 +273,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.events.Push(events.JobEvent{ Event: events.EventJobError, ID: jb.ID(), + Error: err, Start: start, Elapsed: time.Since(start), }) @@ -279,6 +287,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit Start: start, Elapsed: time.Since(start), }) + + continue } // handle the response protocol @@ -288,6 +298,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit Event: events.EventJobError, ID: jb.ID(), Start: start, + Error: err, Elapsed: time.Since(start), }) p.putPayload(exec) @@ -307,6 +318,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit Start: start, Elapsed: time.Since(start), }) + // return payload p.putPayload(exec) } @@ -343,6 +355,10 @@ func (p *Plugin) Stop() error { // just wait pollers for 5 seconds before exit time.Sleep(time.Second * 5) + p.Lock() + p.workersPool.Destroy(context.Background()) + p.Unlock() + return nil } diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go deleted file mode 100644 index c839130f..00000000 --- a/plugins/kv/drivers/boltdb/plugin.go +++ /dev/null @@ -1,71 +0,0 @@ -package boltdb - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/kv" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "boltdb" - RootPluginName string = "kv" -) - -// Plugin BoltDB K/V storage. -type Plugin struct { - cfgPlugin config.Configurer - // logger - log logger.Logger - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - - drivers uint -} - -func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(RootPluginName) { - return errors.E(errors.Disabled) - } - - s.stop = make(chan struct{}) - s.log = log - s.cfgPlugin = cfg - return nil -} - -// Serve is noop here -func (s *Plugin) Serve() chan error { - return make(chan error, 1) -} - -func (s *Plugin) Stop() error { - if s.drivers > 0 { - for i := uint(0); i < s.drivers; i++ { - // send close signal to every driver - s.stop <- struct{}{} - } - } - return nil -} - -func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { - const op = errors.Op("boltdb_plugin_provide") - st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop) - if err != nil { - return nil, errors.E(op, err) - } - - // save driver number to release resources after Stop - s.drivers++ - - return st, nil -} - -// Name returns plugin name -func (s *Plugin) Name() string { - return PluginName -} - -// Available interface implementation -func (s *Plugin) Available() {} diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index 53fade97..c6ca96c3 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -16,11 +16,6 @@ const PluginName string = "kv" const ( // driver is the mandatory field which should present in every storage driver string = "driver" - - memcached string = "memcached" - boltdb string = "boltdb" - redis string = "redis" - memory string = "memory" ) // Plugin for the unified storage @@ -52,40 +47,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { return nil } -func (p *Plugin) Serve() chan error { //nolint:gocognit +func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("kv_plugin_serve") // key - storage name in the config // value - storage - /* - For example we can have here 2 storages (but they are not pre-configured) - for the boltdb and memcached - We should provide here the actual configs for the all requested storages - kv: - boltdb-south: - driver: boltdb - dir: "tests/rr-bolt" - file: "rr.db" - bucket: "rr" - permissions: 777 - ttl: 40s - - boltdb-north: - driver: boltdb - dir: "tests/rr-bolt" - file: "rr.db" - bucket: "rr" - permissions: 777 - ttl: 40s - - memcached: - driver: memcached - addr: [ "127.0.0.1:11211" ] - - - For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached - when user requests for example boltdb-south, we should provide that particular preconfigured storage - */ + // For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached + // when user requests for example boltdb-south, we should provide that particular preconfigured storage + for k, v := range p.cfg.Data { // for example if the key not properly formatted (yaml) if v == nil { @@ -109,43 +78,16 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, k) // at this point we know, that driver field present in the configuration - switch v.(map[string]interface{})[driver] { - case memcached: - if _, ok := p.constructors[memcached]; !ok { - p.log.Warn("no memcached constructors registered", "registered", p.constructors) - continue - } - - storage, err := p.constructors[memcached].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - - case boltdb: - if _, ok := p.constructors[boltdb]; !ok { - p.log.Warn("no boltdb constructors registered", "registered", p.constructors) - continue - } - - storage, err := p.constructors[boltdb].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } + drName := v.(map[string]interface{})[driver] - // save the storage - p.storages[k] = storage - case memory: - if _, ok := p.constructors[memory]; !ok { - p.log.Warn("no in-memory constructors registered", "registered", p.constructors) + // driver name should be a string + if drStr, ok := drName.(string); ok { + if _, ok := p.constructors[drStr]; !ok { + p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors) continue } - storage, err := p.constructors[memory].KVConstruct(configKey) + storage, err := p.constructors[drStr].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -153,42 +95,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // save the storage p.storages[k] = storage - case redis: - if _, ok := p.constructors[redis]; !ok { - p.log.Warn("no redis constructors registered", "registered", p.constructors) - continue - } - - // first - try local configuration - switch { - case p.cfgPlugin.Has(configKey): - storage, err := p.constructors[redis].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - case p.cfgPlugin.Has(redis): - storage, err := p.constructors[redis].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - continue - default: - // otherwise - error, no local or global config - p.log.Warn("no global or local redis configuration provided", "key", configKey) - continue - } - - default: - p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver]))) } + + continue } return errCh @@ -220,5 +129,4 @@ func (p *Plugin) Name() string { } // Available interface implementation -func (p *Plugin) Available() { -} +func (p *Plugin) Available() {} diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/memcached/config.go index 6d413790..6d413790 100644 --- a/plugins/kv/drivers/memcached/config.go +++ b/plugins/memcached/config.go diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/memcached/driver.go index e24747fe..e24747fe 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/memcached/driver.go diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/memcached/plugin.go index 59a2b7cb..59a2b7cb 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/memcached/plugin.go diff --git a/plugins/resetter/plugin.go b/plugins/resetter/plugin.go index b2fe59af..191185ae 100644 --- a/plugins/resetter/plugin.go +++ b/plugins/resetter/plugin.go @@ -21,7 +21,7 @@ func (p *Plugin) Reset(name string) error { const op = errors.Op("resetter_plugin_reset_by_name") svc, ok := p.registry[name] if !ok { - return errors.E(op, errors.Errorf("no such service: %s", name)) + return errors.E(op, errors.Errorf("no such plugin: %s", name)) } return svc.Reset() diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 16e3bd8c..5f5f2df9 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -216,7 +216,7 @@ func (server *Plugin) collectPoolEvents(event interface{}) { case events.EventMaxMemory: server.log.Warn("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid()) case events.EventNoFreeWorkers: - server.log.Warn("no free workers in pool", "error", we.Payload.(error).Error()) + server.log.Warn("no free workers in the pool", "error", we.Payload.(error).Error()) case events.EventPoolError: server.log.Error("pool error", "error", we.Payload.(error).Error()) case events.EventSupervisorError: diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/sqs/config.go index 9b2a1ca8..9b2a1ca8 100644 --- a/plugins/jobs/drivers/sqs/config.go +++ b/plugins/sqs/config.go diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/sqs/consumer.go index 17af1caa..dfbda154 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/sqs/consumer.go @@ -24,7 +24,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" ) -type JobConsumer struct { +type consumer struct { sync.Mutex pq priorityqueue.Queue log logger.Logger @@ -56,7 +56,7 @@ type JobConsumer struct { pauseCh chan struct{} } -func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_sqs_consumer") // if no such key - error @@ -88,7 +88,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure globalCfg.InitDefault() // initialize job consumer - jb := &JobConsumer{ + jb := &consumer{ pq: pq, log: log, eh: e, @@ -142,7 +142,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure return jb, nil } -func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_sqs_consumer") // if no global section @@ -173,7 +173,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf } // initialize job consumer - jb := &JobConsumer{ + jb := &consumer{ pq: pq, log: log, eh: e, @@ -227,12 +227,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf return jb, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (c *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("sqs_push") // check if the pipeline registered // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != jb.Options.Pipeline { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) } @@ -243,17 +243,17 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay)) } - err := j.handleItem(ctx, fromJob(jb)) + err := c.handleItem(ctx, fromJob(jb)) if err != nil { return errors.E(op, err) } return nil } -func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { +func (c *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("sqs_state") - attr, err := j.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ - QueueUrl: j.queueURL, + attr, err := c.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + QueueUrl: c.queueURL, AttributeNames: []types.QueueAttributeName{ types.QueueAttributeNameApproximateNumberOfMessages, types.QueueAttributeNameApproximateNumberOfMessagesDelayed, @@ -265,13 +265,13 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { return nil, errors.E(op, err) } - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) out := &jobState.State{ Pipeline: pipe.Name(), Driver: pipe.Driver(), - Queue: *j.queueURL, - Ready: ready(atomic.LoadUint32(&j.listeners)), + Queue: *c.queueURL, + Ready: ready(atomic.LoadUint32(&c.listeners)), } nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) @@ -292,28 +292,28 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { return out, nil } -func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { - j.pipeline.Store(p) +func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { + c.pipeline.Store(p) return nil } -func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("sqs_run") - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } - atomic.AddUint32(&j.listeners, 1) + atomic.AddUint32(&c.listeners, 1) // start listener - go j.listen(context.Background()) + go c.listen(context.Background()) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -323,11 +323,13 @@ func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(context.Context) error { - j.pauseCh <- struct{}{} +func (c *consumer) Stop(context.Context) error { + if atomic.LoadUint32(&c.listeners) > 0 { + c.pauseCh <- struct{}{} + } - pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -336,27 +338,27 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (c *consumer) Pause(_ context.Context, p string) { // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) return } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 0 { - j.log.Warn("no active listeners, nothing to pause") + c.log.Warn("no active listeners, nothing to pause") return } - atomic.AddUint32(&j.listeners, ^uint32(0)) + atomic.AddUint32(&c.listeners, ^uint32(0)) // stop consume - j.pauseCh <- struct{}{} + c.pauseCh <- struct{}{} - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -364,28 +366,28 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (c *consumer) Resume(_ context.Context, p string) { // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) + c.log.Error("no such pipeline", "requested", p, "actual", pipe.Name()) return } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 1 { - j.log.Warn("sqs listener already in the active state") + c.log.Warn("sqs listener already in the active state") return } // start listener - go j.listen(context.Background()) + go c.listen(context.Background()) // increase num of listeners - atomic.AddUint32(&j.listeners, 1) + atomic.AddUint32(&c.listeners, 1) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -393,12 +395,12 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { }) } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { - d, err := msg.pack(j.queueURL) +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { + d, err := msg.pack(c.queueURL) if err != nil { return err } - _, err = j.client.SendMessage(ctx, d) + _, err = c.client.SendMessage(ctx, d) if err != nil { return err } diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/sqs/item.go index df72b2e5..969d8b5b 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/sqs/item.go @@ -22,6 +22,7 @@ const ( ) var itemAttributes = []string{ + job.RRID, job.RRJob, job.RRDelay, job.RRPriority, @@ -184,6 +185,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { QueueUrl: queue, DelaySeconds: int32(i.Options.Delay), MessageAttributes: map[string]types.MessageAttributeValue{ + job.RRID: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Ident)}, job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil}, @@ -192,7 +194,7 @@ func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) { }, nil } -func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { +func (c *consumer) unpack(msg *types.Message) (*Item, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { @@ -228,6 +230,7 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { item := &Item{ Job: *msg.MessageAttributes[job.RRJob].StringValue, + Ident: *msg.MessageAttributes[job.RRID].StringValue, Payload: *msg.Body, Headers: h, Options: &Options{ @@ -236,10 +239,10 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { // private approxReceiveCount: int64(recCount), - client: j.client, - queue: j.queueURL, + client: c.client, + queue: c.queueURL, receiptHandler: msg.ReceiptHandle, - requeueFn: j.handleItem, + requeueFn: c.handleItem, }, } diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/sqs/listener.go index 9efef90d..215dd6a5 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/sqs/listener.go @@ -18,22 +18,22 @@ const ( NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" ) -func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit +func (c *consumer) listen(ctx context.Context) { //nolint:gocognit for { select { - case <-j.pauseCh: - j.log.Warn("sqs listener stopped") + case <-c.pauseCh: + c.log.Warn("sqs listener stopped") return default: - message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ - QueueUrl: j.queueURL, - MaxNumberOfMessages: j.prefetch, + message, err := c.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ + QueueUrl: c.queueURL, + MaxNumberOfMessages: c.prefetch, AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)}, MessageAttributeNames: []string{All}, // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. - VisibilityTimeout: j.visibilityTimeout, - WaitTimeSeconds: j.waitTime, + VisibilityTimeout: c.visibilityTimeout, + WaitTimeSeconds: c.waitTime, }) if err != nil { @@ -42,10 +42,10 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok { // in case of NonExistentQueue - recreate the queue if apiErr.Code == NonExistentQueue { - j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault()) - _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags}) + c.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault()) + _, err = c.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: c.queue, Attributes: c.attributes, Tags: c.tags}) if err != nil { - j.log.Error("create queue", "error", err) + c.log.Error("create queue", "error", err) } // To successfully create a new queue, you must provide a // queue name that adheres to the limits related to the queues @@ -60,27 +60,27 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit } } - j.log.Error("receive message", "error", err) + c.log.Error("receive message", "error", err) continue } for i := 0; i < len(message.Messages); i++ { m := message.Messages[i] - item, err := j.unpack(&m) + item, err := c.unpack(&m) if err != nil { - _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: j.queueURL, + _, errD := c.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: c.queueURL, ReceiptHandle: m.ReceiptHandle, }) if errD != nil { - j.log.Error("message unpack, failed to delete the message from the queue", "error", err) + c.log.Error("message unpack, failed to delete the message from the queue", "error", err) } - j.log.Error("message unpack", "error", err) + c.log.Error("message unpack", "error", err) continue } - j.pq.Insert(item) + c.pq.Insert(item) } } } diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/sqs/plugin.go index 54f61ff5..54f61ff5 100644 --- a/plugins/jobs/drivers/sqs/plugin.go +++ b/plugins/sqs/plugin.go diff --git a/plugins/status/plugin.go b/plugins/status/plugin.go index 82a0fa6c..b76ad0a3 100644 --- a/plugins/status/plugin.go +++ b/plugins/status/plugin.go @@ -85,7 +85,7 @@ func (c *Plugin) status(name string) (Status, error) { const op = errors.Op("checker_plugin_status") svc, ok := c.statusRegistry[name] if !ok { - return Status{}, errors.E(op, errors.Errorf("no such service: %s", name)) + return Status{}, errors.E(op, errors.Errorf("no such plugin: %s", name)) } return svc.Status(), nil @@ -96,7 +96,7 @@ func (c *Plugin) ready(name string) (Status, error) { const op = errors.Op("checker_plugin_ready") svc, ok := c.readyRegistry[name] if !ok { - return Status{}, errors.E(op, errors.Errorf("no such service: %s", name)) + return Status{}, errors.E(op, errors.Errorf("no such plugin: %s", name)) } return svc.Ready(), nil diff --git a/tests/allocate-failed.php b/tests/allocate-failed.php new file mode 100644 index 00000000..8514ecc0 --- /dev/null +++ b/tests/allocate-failed.php @@ -0,0 +1,18 @@ +<?php + +declare(strict_types=1); + +use Spiral\Goridge\StreamRelay; +use Spiral\RoadRunner\Worker as RoadRunner; + +require __DIR__ . "/vendor/autoload.php"; + +if (file_exists('break')) { + throw new Exception('oops'); +} + +$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); + +while($rr->waitPayload()){ + $rr->respond(new \Spiral\RoadRunner\Payload("")); +} diff --git a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml index 87f46069..92d090d4 100644 --- a/tests/plugins/jobs/beanstalk/.rr-no-global.yaml +++ b/tests/plugins/jobs/beanstalk/.rr-no-global.yaml @@ -29,3 +29,6 @@ jobs: reserve_timeout: 10s consume: [ "test-1" ] + +endure: + log_level: debug diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml new file mode 100644 index 00000000..cdc2655f --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-declare.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +boltdb: + permissions: 0777 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml new file mode 100644 index 00000000..804db543 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-init.yaml @@ -0,0 +1,43 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +boltdb: + permissions: 0777 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + timeout: 1 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1: + driver: boltdb + prefetch: 100 + file: "rr1.db" + priority: 1 + + test-2: + driver: boltdb + prefetch: 100 + file: "rr2.db" + priority: 2 + + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml b/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml new file mode 100644 index 00000000..d375a9a5 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-boltdb-jobs-err.yaml @@ -0,0 +1,24 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +boltdb: + permissions: 0777 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/jobs/boltdb/.rr-no-global.yaml b/tests/plugins/jobs/boltdb/.rr-no-global.yaml new file mode 100644 index 00000000..54aaf3c6 --- /dev/null +++ b/tests/plugins/jobs/boltdb/.rr-no-global.yaml @@ -0,0 +1,41 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../client.php echo pipes" + relay: "pipes" + relay_timeout: "20s" + +logs: + level: error + mode: development + +jobs: + # num logical cores by default + num_pollers: 10 + # 1mi by default + pipeline_size: 100000 + # worker pool configuration + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + # list of broker pipelines associated with endpoints + pipelines: + test-1: + driver: boltdb + prefetch: 100 + file: "rr1.db" + priority: 1 + + test-2: + driver: boltdb + prefetch: 100 + file: "rr2.db" + priority: 1 + + # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually + consume: [ "test-1", "test-2" ] + diff --git a/tests/plugins/jobs/helpers.go b/tests/plugins/jobs/helpers.go index 5067ef9f..6c2d05ca 100644 --- a/tests/plugins/jobs/helpers.go +++ b/tests/plugins/jobs/helpers.go @@ -95,7 +95,7 @@ func pushToPipeDelayed(pipeline string, delay int64) func(t *testing.T) { req := &jobsv1beta.PushRequest{Job: &jobsv1beta.Job{ Job: "some/php/namespace", - Id: "1", + Id: "2", Payload: `{"hello":"world"}`, Headers: map[string]*jobsv1beta.HeaderValue{"test": {Value: []string{"test2"}}}, Options: &jobsv1beta.Options{ diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 48d6515d..949698ec 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -14,10 +14,10 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index 8e74c7cc..9f4d37ec 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -14,10 +14,10 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/beanstalk" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_boltdb_test.go b/tests/plugins/jobs/jobs_boltdb_test.go new file mode 100644 index 00000000..ab36ffa4 --- /dev/null +++ b/tests/plugins/jobs/jobs_boltdb_test.go @@ -0,0 +1,506 @@ +package jobs + +import ( + "net" + "net/rpc" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/golang/mock/gomock" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/boltdb" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/informer" + "github.com/spiral/roadrunner/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/resetter" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" + "github.com/spiral/roadrunner/v2/tests/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + rr1db string = "rr1.db" + rr2db string = "rr2.db" +) + +func TestBoltDBInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-init.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-1", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-2", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + + mockLogger.EXPECT().Info("boltdb listener stopped").Times(4) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + stopCh <- struct{}{} + wg.Wait() + + assert.NoError(t, os.Remove(rr1db)) + assert.NoError(t, os.Remove(rr2db)) +} + +func TestBoltDBDeclare(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} + +func TestBoltDBJobsError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-jobs-err.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Error("jobs protocol error", "error", "error", "delay", gomock.Any(), "requeue", gomock.Any()).Times(3) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").Times(2) + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 25) + t.Run("PausePipeline", pausePipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} + +func TestBoltDBNoGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-no-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + require.Error(t, err) +} + +func TestBoltDBStats(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "boltdb/.rr-boltdb-declare.yaml", + Prefix: "rr", + } + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + // general + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("Started RPC service", "address", "tcp://127.0.0.1:6001", "plugins", gomock.Any()).Times(1) + mockLogger.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + mockLogger.EXPECT().Info("job pushed to the queue", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("pipeline active", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(2) + mockLogger.EXPECT().Info("pipeline paused", "pipeline", "test-3", "driver", "boltdb", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("job processed without errors", "ID", gomock.Any(), "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Info("job processing started", "start", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + mockLogger.EXPECT().Warn("pipeline stopped", "pipeline", "test-3", "start", gomock.Any(), "elapsed", gomock.Any()).Times(1) + mockLogger.EXPECT().Info("boltdb listener stopped").AnyTimes() + + err = cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + mockLogger, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &boltdb.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", declareBoltDBPipe(rr1db)) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) + time.Sleep(time.Second * 2) + t.Run("PausePipeline", pausePipelines("test-3")) + time.Sleep(time.Second * 2) + t.Run("PushPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + + out := &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, "test-3", out.Pipeline) + assert.Equal(t, "boltdb", out.Driver) + assert.Equal(t, "push", out.Queue) + + assert.Equal(t, int64(1), out.Active) + assert.Equal(t, int64(1), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, false, out.Ready) + + time.Sleep(time.Second) + t.Run("ResumePipeline", resumePipes("test-3")) + time.Sleep(time.Second * 7) + + out = &jobState.State{} + t.Run("Stats", stats(out)) + + assert.Equal(t, "test-3", out.Pipeline) + assert.Equal(t, "boltdb", out.Driver) + assert.Equal(t, "push", out.Queue) + + assert.Equal(t, int64(0), out.Active) + assert.Equal(t, int64(0), out.Delayed) + assert.Equal(t, int64(0), out.Reserved) + assert.Equal(t, true, out.Ready) + + time.Sleep(time.Second) + t.Run("DestroyPipeline", destroyPipelines("test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + assert.NoError(t, os.Remove(rr1db)) +} + +func declareBoltDBPipe(file string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ + "driver": "boltdb", + "name": "test-3", + "prefetch": "100", + "priority": "3", + "file": file, + }} + + er := &jobsv1beta.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) + } +} diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index 98590a96..2890aa9d 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -15,9 +15,9 @@ import ( goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index f0b5697b..951d6227 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -12,11 +12,11 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" @@ -171,9 +171,12 @@ func TestJOBSMetrics(t *testing.T) { signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) tt := time.NewTimer(time.Minute * 3) + wg := &sync.WaitGroup{} + wg.Add(1) go func() { defer tt.Stop() + defer wg.Done() for { select { case e := <-ch: @@ -220,7 +223,7 @@ func TestJOBSMetrics(t *testing.T) { assert.Contains(t, genericOut, "workers_memory_bytes") close(sig) - time.Sleep(time.Second * 2) + wg.Wait() } const getAddr = "http://127.0.0.1:2112/metrics" diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index 630a059a..2dd2c8db 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -17,11 +17,11 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/sqs" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" @@ -437,15 +437,15 @@ func TestSQSStat(t *testing.T) { time.Sleep(time.Second * 3) - t.Run("DeclareSQSPipeline", declareSQSPipe) - t.Run("ConsumeSQSPipeline", resumePipes("test-3")) - t.Run("PushSQSPipeline", pushToPipe("test-3")) + t.Run("DeclarePipeline", declareSQSPipe) + t.Run("ConsumePipeline", resumePipes("test-3")) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) - t.Run("PauseSQSPipeline", pausePipelines("test-3")) + t.Run("PausePipeline", pausePipelines("test-3")) time.Sleep(time.Second) - t.Run("PushSQSPipelineDelayed", pushToPipeDelayed("test-3", 5)) - t.Run("PushSQSPipeline", pushToPipe("test-3")) + t.Run("PushPipelineDelayed", pushToPipeDelayed("test-3", 5)) + t.Run("PushPipeline", pushToPipe("test-3")) time.Sleep(time.Second) out := &jobState.State{} @@ -474,7 +474,7 @@ func TestSQSStat(t *testing.T) { assert.Equal(t, int64(0), out.Delayed) assert.Equal(t, int64(0), out.Reserved) - t.Run("DestroyEphemeralPipeline", destroyPipelines("test-3")) + t.Run("DestroyPipeline", destroyPipelines("test-3")) time.Sleep(time.Second * 5) stopCh <- struct{}{} diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go index f6521e8d..80fed8eb 100644 --- a/tests/plugins/jobs/jobs_with_toxics_test.go +++ b/tests/plugins/jobs/jobs_with_toxics_test.go @@ -11,15 +11,15 @@ import ( toxiproxy "github.com/Shopify/toxiproxy/client" "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/amqp" + "github.com/spiral/roadrunner/v2/plugins/beanstalk" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/sqs" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index ced1c5fe..e757a9e6 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -12,11 +12,11 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner/v2/plugins/boltdb" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memcached" "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" |