diff options
author | Valery Piashchynski <[email protected]> | 2021-10-27 22:50:03 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-10-27 22:50:03 +0300 |
commit | c8c3f9f113eae13aa37cf92043b288bb0c68a622 (patch) | |
tree | 42f8ab386735d5f8b002907d07249e94b4c10a12 | |
parent | 1f62e21020cc3014e9eb2dc33c154de6dd5b22d5 (diff) | |
parent | ab591e7f122e28857cef00c905a8125992ea3cdf (diff) |
[#838]: feat(events): events package deep refactoringv2.6.0-alpha.1
[#838]: feat(events): events package deep refactoring
35 files changed, 1025 insertions, 864 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 380f4874..47bc475f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,13 @@ # CHANGELOG -## v2.5.0 (20.10.2021) +# v2.6.0 (-.-.2021) + +### ๐ New: + +- โ๏ธ New internal message bus. Available globally. Supports wildcard subscriptions (for example: `http.*` will subscribe you to the all events coming from the `http` plugin). The subscriptions can be made from any RR plugin to any RR plugin. + + +# v2.5.0 (20.10.2021) # ๐ Breaking change: @@ -12,9 +19,9 @@ ```yaml broadcast: - default: - driver: memory - interval: 1 + default: + driver: memory + interval: 1 ``` ### New style: @@ -23,7 +30,7 @@ broadcast: broadcast: default: driver: memory - config: {} <--------------- NEW + config: { } <--------------- NEW ``` ```yaml @@ -37,8 +44,8 @@ kv: memcached-rr: driver: memcached config: <--------------- NEW - addr: - - "127.0.0.1:11211" + addr: + - "127.0.0.1:11211" broadcast: default: @@ -51,8 +58,11 @@ broadcast: ## ๐ New: - โ๏ธ **[BETA]** GRPC plugin update to v2. -- โ๏ธ [Roadrunner-plugins](https://github.com/spiral/roadrunner-plugins) repository. This is the new home for the roadrunner plugins with documentation, configuration samples, and common problems. -- โ๏ธ **[BETA]** Let's Encrypt support. RR now can obtain an SSL certificate/PK for your domain automatically. Here is the new configuration: +- โ๏ธ [Roadrunner-plugins](https://github.com/spiral/roadrunner-plugins) repository. This is the new home for the + roadrunner plugins with documentation, configuration samples, and common problems. +- โ๏ธ **[BETA]** Let's Encrypt support. RR now can obtain an SSL certificate/PK for your domain automatically. Here is + the new configuration: + ```yaml ssl: # Host and port to listen on (eg.: `127.0.0.1:443`). @@ -105,23 +115,25 @@ broadcast: - โ๏ธ Add a new option to the `logs` plugin to configure the line ending. By default, used `\n`. **New option**: + ```yaml # Logs plugin settings logs: - (....) - # Line ending - # - # Default: "\n". - line_ending: "\n" + (....) + # Line ending + # + # Default: "\n". + line_ending: "\n" ``` - โ๏ธ HTTP [Access log support](https://github.com/spiral/roadrunner-plugins/issues/34) at the `Info` log level. + ```yaml http: address: 127.0.0.1:55555 max_request_size: 1024 access_logs: true <-------- Access Logs ON/OFF - middleware: [] + middleware: [ ] pool: num_workers: 2 @@ -129,13 +141,16 @@ http: allocate_timeout: 60s destroy_timeout: 60s ``` -- โ๏ธ HTTP middleware to handle `X-Sendfile` [header](https://github.com/spiral/roadrunner-plugins/issues/9). - Middleware reads the file in 10MB chunks. So, for example for the 5Gb file, only 10MB of RSS will be used. If the file size is smaller than 10MB, the middleware fits the buffer to the file size. + +- โ๏ธ HTTP middleware to handle `X-Sendfile` [header](https://github.com/spiral/roadrunner-plugins/issues/9). Middleware + reads the file in 10MB chunks. So, for example for the 5Gb file, only 10MB of RSS will be used. If the file size is + smaller than 10MB, the middleware fits the buffer to the file size. + ```yaml http: address: 127.0.0.1:44444 max_request_size: 1024 - middleware: ["sendfile"] <----- NEW MIDDLEWARE + middleware: [ "sendfile" ] <----- NEW MIDDLEWARE pool: num_workers: 2 @@ -145,6 +160,7 @@ http: ``` - โ๏ธ Service plugin now supports env variables passing to the script/executable/binary/any like in the `server` plugin: + ```yaml service: some_service_1: @@ -152,21 +168,25 @@ service: process_num: 1 exec_timeout: 5s # s,m,h (seconds, minutes, hours) remain_after_exit: true - env: <----------------- NEW + env: <----------------- NEW foo: "BAR" restart_sec: 1 ``` - โ๏ธ Server plugin can accept scripts (sh, bash, etc) in it's `command` configuration key: + ```yaml server: - command: "./script.sh OR sh script.sh" <--- UPDATED - relay: "pipes" - relay_timeout: "20s" + command: "./script.sh OR sh script.sh" <--- UPDATED + relay: "pipes" + relay_timeout: "20s" ``` -The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can close `stdin`, `stdout` or `stderr`. + +The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can +close `stdin`, `stdout` or `stderr`. - โ๏ธ Nats jobs driver support - [PR](https://github.com/spiral/roadrunner-plugins/pull/68). + ```yaml nats: addr: "demo.nats.io" @@ -194,12 +214,14 @@ jobs: consume: [ "test-1" ] ``` -- Driver uses NATS JetStream API and not compatible with non-js API. + +- Driver uses NATS JetStream API and is not compatible with non-js API. -- โ๏ธ Response API for the NATS, RabbitMQ, SQS and Beanstalk drivers. This means, that you'll be able to respond to a specified in the response queue. - Limitations: - - To send a response to the queue maintained by the RR, you should send it as a `Job` type. There are no limitations for the responses into the other queues (tubes, subjects). +- โ๏ธ Response API for the NATS, RabbitMQ, SQS and Beanstalk drivers. This means, that you'll be able to respond to a + specified in the response queue. Limitations: + - To send a response to the queue maintained by the RR, you should send it as a `Job` type. There are no limitations + for the responses into the other queues (tubes, subjects). - Driver uses the same endpoint (address) to send the response as specified in the configuration. ## ๐ฉน Fixes: @@ -215,45 +237,53 @@ jobs: - ๐ฆ roadrunner `v2.5.0` - ๐ฆ roadrunner-plugins `v2.5.0` - ๐ฆ roadrunner-temporal `v1.0.10` -- ๐ฆ endure `v1.0.5` +- ๐ฆ endure `v1.0.6` - ๐ฆ goridge `v3.2.3` ## v2.4.1 (13.09.2021) ## ๐ฉน Fixes: -- ๐ Fix: bug with not-idempotent call to the `attributes.Init`. -- ๐ Fix: memory jobs driver behavior. Now memory driver starts consuming automatically if the user consumes the pipeline in the configuration. +- ๐ Fix: bug with not-idempotent call to the `attributes.Init`. +- ๐ Fix: memory jobs driver behavior. Now memory driver starts consuming automatically if the user consumes the + pipeline in the configuration. ## v2.4.0 (02.09.2021) ## ๐ Internal BC: -- ๐จ Pool, worker interfaces: payload now passed and returned by the pointer. +- ๐จ Pool, worker interfaces: payload now passed and returned by the pointer. ## ๐ New: -- โ๏ธ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `memory` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726) -- โ๏ธ Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port`, `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2) -- โ๏ธ Support for the Docker images via GitHub packages. -- โ๏ธ Go 1.17 support for the all spiral packages. +- โ๏ธ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. + Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `memory` and local queue powered + by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726) +- โ๏ธ Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port` + , `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other + plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2) +- โ๏ธ Support for the Docker images via GitHub packages. +- โ๏ธ Go 1.17 support for the all spiral packages. ## ๐ฉน Fixes: -- ๐ Fix: fixed bug with goroutines waiting on the internal worker's container channel, [issue](https://github.com/spiral/roadrunner/issues/750). -- ๐ Fix: RR become unresponsive when new workers failed to re-allocate, [issue](https://github.com/spiral/roadrunner/issues/772). -- ๐ Fix: add `debug` pool config key to the `.rr.yaml` configuration [reference](https://github.com/spiral/roadrunner-binary/issues/79). +- ๐ Fix: fixed bug with goroutines waiting on the internal worker's container + channel, [issue](https://github.com/spiral/roadrunner/issues/750). +- ๐ Fix: RR become unresponsive when new workers failed to + re-allocate, [issue](https://github.com/spiral/roadrunner/issues/772). +- ๐ Fix: add `debug` pool config key to the `.rr.yaml` + configuration [reference](https://github.com/spiral/roadrunner-binary/issues/79). ## ๐ฆ Packages: -- ๐ฆ Update goridge to `v3.2.1` -- ๐ฆ Update temporal to `v1.0.9` -- ๐ฆ Update endure to `v1.0.4` +- ๐ฆ Update goridge to `v3.2.1` +- ๐ฆ Update temporal to `v1.0.9` +- ๐ฆ Update endure to `v1.0.4` ## ๐ Summary: -- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29?closed=1) -- RR-Binary Milestone [2.4.0](https://github.com/spiral/roadrunner-binary/milestone/10?closed=1) +- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29?closed=1) +- RR-Binary Milestone [2.4.0](https://github.com/spiral/roadrunner-binary/milestone/10?closed=1) --- @@ -261,13 +291,13 @@ jobs: ## ๐ฉน Fixes: -- ๐ Fix: Do not call the container's Stop method after the container stopped by an error. -- ๐ Fix: Bug with ttl incorrectly handled by the worker [PR](https://github.com/spiral/roadrunner/pull/749) -- ๐ Fix: Add `RR_BROADCAST_PATH` to the `websockets` plugin [PR](https://github.com/spiral/roadrunner/pull/749) +- ๐ Fix: Do not call the container's Stop method after the container stopped by an error. +- ๐ Fix: Bug with ttl incorrectly handled by the worker [PR](https://github.com/spiral/roadrunner/pull/749) +- ๐ Fix: Add `RR_BROADCAST_PATH` to the `websockets` plugin [PR](https://github.com/spiral/roadrunner/pull/749) ## ๐ Summary: -- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/31?closed=1) +- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/31?closed=1) --- @@ -275,32 +305,32 @@ jobs: ## ๐ New: -- โ๏ธ Rework `broadcast` plugin. Add architecture diagrams to the `doc` - folder. [PR](https://github.com/spiral/roadrunner/pull/732) -- โ๏ธ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736) +- โ๏ธ Rework `broadcast` plugin. Add architecture diagrams to the `doc` + folder. [PR](https://github.com/spiral/roadrunner/pull/732) +- โ๏ธ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736) ## ๐ฉน Fixes: -- ๐ Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit - reached [PR](https://github.com/spiral/roadrunner/pull/738) -- ๐ Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next - request [PR](https://github.com/spiral/roadrunner/pull/738) -- ๐ Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717) - , [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719) -- ๐ Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720) -- ๐ Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128) -- ๐ Fix: Bug, incorrect request `origin` check [Bug](https://github.com/spiral/roadrunner/issues/727) +- ๐ Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit + reached [PR](https://github.com/spiral/roadrunner/pull/738) +- ๐ Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next + request [PR](https://github.com/spiral/roadrunner/pull/738) +- ๐ Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717) + , [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719) +- ๐ Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720) +- ๐ Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128) +- ๐ Fix: Bug, incorrect request `origin` check [Bug](https://github.com/spiral/roadrunner/issues/727) ## ๐ฆ Packages: -- ๐ฆ Update goridge to `v3.1.4` -- ๐ฆ Update temporal to `v1.0.8` +- ๐ฆ Update goridge to `v3.1.4` +- ๐ฆ Update temporal to `v1.0.8` ## ๐ Summary: -- RR Milestone [2.3.1](https://github.com/spiral/roadrunner/milestone/30?closed=1) -- Temporal Milestone [1.0.8](https://github.com/temporalio/roadrunner-temporal/milestone/11?closed=1) -- Goridge Milestone [3.1.4](https://github.com/spiral/goridge/milestone/11?closed=1) +- RR Milestone [2.3.1](https://github.com/spiral/roadrunner/milestone/30?closed=1) +- Temporal Milestone [1.0.8](https://github.com/temporalio/roadrunner-temporal/milestone/11?closed=1) +- Goridge Milestone [3.1.4](https://github.com/spiral/goridge/milestone/11?closed=1) --- @@ -308,36 +338,36 @@ jobs: ## ๐ New: -- โ๏ธ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of - thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus - on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513) -- โ๏ธ Protobuf binary messages for the `websockets` and `kv` RPC calls under the - hood. [Issue](https://github.com/spiral/roadrunner/issues/711) -- โ๏ธ Json-schemas for the config file v1.0 (it also registered - in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614)) -- โ๏ธ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead) -- โ๏ธ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error - code. [Issue](https://github.com/spiral/roadrunner/issues/659) -- โ๏ธ Expose HTTP plugin metrics (workers memory, requests count, requests duration) - . [Issue](https://github.com/spiral/roadrunner/issues/489) -- โ๏ธ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh` - scripts. [Issue](https://github.com/spiral/roadrunner/issues/658) -- โ๏ธ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation) - , [Issue](https://github.com/spiral/roadrunner/issues/545) +- โ๏ธ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of + thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus + on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513) +- โ๏ธ Protobuf binary messages for the `websockets` and `kv` RPC calls under the + hood. [Issue](https://github.com/spiral/roadrunner/issues/711) +- โ๏ธ Json-schemas for the config file v1.0 (it also registered + in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614)) +- โ๏ธ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead) +- โ๏ธ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error + code. [Issue](https://github.com/spiral/roadrunner/issues/659) +- โ๏ธ Expose HTTP plugin metrics (workers memory, requests count, requests duration) + . [Issue](https://github.com/spiral/roadrunner/issues/489) +- โ๏ธ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh` + scripts. [Issue](https://github.com/spiral/roadrunner/issues/658) +- โ๏ธ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation) + , [Issue](https://github.com/spiral/roadrunner/issues/545) ## ๐ฉน Fixes: -- ๐ Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686) -- ๐ Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in - logs: [Bug](https://github.com/spiral/roadrunner/issues/659) -- ๐ Fix: Error message will be properly shown in the log in case of `SoftJob` - error: [Bug](https://github.com/spiral/roadrunner/issues/691) -- ๐ Fix: Wrong applied middlewares for the `fcgi` server leads to the - NPE: [Bug](https://github.com/spiral/roadrunner/issues/701) +- ๐ Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686) +- ๐ Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in + logs: [Bug](https://github.com/spiral/roadrunner/issues/659) +- ๐ Fix: Error message will be properly shown in the log in case of `SoftJob` + error: [Bug](https://github.com/spiral/roadrunner/issues/691) +- ๐ Fix: Wrong applied middlewares for the `fcgi` server leads to the + NPE: [Bug](https://github.com/spiral/roadrunner/issues/701) ## ๐ฆ Packages: -- ๐ฆ Update goridge to `v3.1.0` +- ๐ฆ Update goridge to `v3.1.0` --- @@ -345,9 +375,9 @@ jobs: ## ๐ฉน Fixes: -- ๐ Fix: revert static plugin. It stays as a separate plugin on the main route (`/`) and supports all the previously - announced features. -- ๐ Fix: remove `build` and other old targets from the Makefile. +- ๐ Fix: revert static plugin. It stays as a separate plugin on the main route (`/`) and supports all the previously + announced features. +- ๐ Fix: remove `build` and other old targets from the Makefile. --- @@ -355,21 +385,21 @@ jobs: ## ๐ New: -- โ๏ธ Reworked `static` plugin. Now, it does not affect the performance of the main route and persist on the separate - file server (within the `http` plugin). Looong awaited feature: `Etag` (+ weak Etags) as well with the `If-Mach` - , `If-None-Match`, `If-Range`, `Last-Modified` - and `If-Modified-Since` tags supported. Static plugin has a bunch of new options such as: `allow`, `calculate_etag` - , `weak` and `pattern`. +- โ๏ธ Reworked `static` plugin. Now, it does not affect the performance of the main route and persist on the separate + file server (within the `http` plugin). Looong awaited feature: `Etag` (+ weak Etags) as well with the `If-Mach` + , `If-None-Match`, `If-Range`, `Last-Modified` + and `If-Modified-Since` tags supported. Static plugin has a bunch of new options such as: `allow`, `calculate_etag` + , `weak` and `pattern`. - ### Option `always` was deleted from the plugin. + ### Option `always` was deleted from the plugin. -- โ๏ธ Update `informer.List` implementation. Now it returns a list with the all available plugins in the runtime. +- โ๏ธ Update `informer.List` implementation. Now it returns a list with the all available plugins in the runtime. ## ๐ฉน Fixes: -- ๐ Fix: issue with wrong ordered middlewares (reverse). Now the order is correct. -- ๐ Fix: issue when RR fails if a user sets `debug` mode with the `exec_ttl` supervisor option. -- ๐ Fix: uniform log levels. Use everywhere the same levels (warn, error, debug, info, panic). +- ๐ Fix: issue with wrong ordered middlewares (reverse). Now the order is correct. +- ๐ Fix: issue when RR fails if a user sets `debug` mode with the `exec_ttl` supervisor option. +- ๐ Fix: uniform log levels. Use everywhere the same levels (warn, error, debug, info, panic). --- @@ -377,102 +407,102 @@ jobs: ## ๐ฉน Fixes: -- ๐ Fix: issue with endure provided wrong logger interface implementation. +- ๐ Fix: issue with endure provided wrong logger interface implementation. ## v2.1.0 (27.04.2021) ## ๐ New: -- โ๏ธ New `service` plugin. Docs: [link](https://roadrunner.dev/docs/beep-beep-service) -- โ๏ธ Stabilize `kv` plugin with `boltdb`, `in-memory`, `memcached` and `redis` drivers. +- โ๏ธ New `service` plugin. Docs: [link](https://roadrunner.dev/docs/beep-beep-service) +- โ๏ธ Stabilize `kv` plugin with `boltdb`, `in-memory`, `memcached` and `redis` drivers. ## ๐ฉน Fixes: -- ๐ Fix: Logger didn't provide an anonymous log instance to a plugins w/o `Named` interface implemented. -- ๐ Fix: http handler was without log listener after `rr reset`. +- ๐ Fix: Logger didn't provide an anonymous log instance to a plugins w/o `Named` interface implemented. +- ๐ Fix: http handler was without log listener after `rr reset`. ## v2.0.4 (06.04.2021) ## ๐ New: -- โ๏ธ Add support for `linux/arm64` platform for docker image (thanks @tarampampam). -- โ๏ธ Add dotenv file support (`.env` in working directory by default; file location can be changed using CLI - flag `--dotenv` or `DOTENV_PATH` environment variable) (thanks @tarampampam). -- ๐ Add a new `raw` mode for the `logger` plugin to keep the stderr log message of the worker unmodified (logger - severity level should be at least `INFO`). -- ๐ Add Readiness probe check. The `status` plugin provides `/ready` endpoint which return the `204` HTTP code if there - are no workers in the `Ready` state and `200 OK` status if there are at least 1 worker in the `Ready` state. +- โ๏ธ Add support for `linux/arm64` platform for docker image (thanks @tarampampam). +- โ๏ธ Add dotenv file support (`.env` in working directory by default; file location can be changed using CLI + flag `--dotenv` or `DOTENV_PATH` environment variable) (thanks @tarampampam). +- ๐ Add a new `raw` mode for the `logger` plugin to keep the stderr log message of the worker unmodified (logger + severity level should be at least `INFO`). +- ๐ Add Readiness probe check. The `status` plugin provides `/ready` endpoint which return the `204` HTTP code if there + are no workers in the `Ready` state and `200 OK` status if there are at least 1 worker in the `Ready` state. ## ๐ฉน Fixes: -- ๐ Fix: bug with the temporal worker which does not follow general graceful shutdown period. +- ๐ Fix: bug with the temporal worker which does not follow general graceful shutdown period. ## v2.0.3 (29.03.2021) ## ๐ฉน Fixes: -- ๐ Fix: slow last response when reached `max_jobs` limit. +- ๐ Fix: slow last response when reached `max_jobs` limit. ## v2.0.2 (06.04.2021) -- ๐ Fix: Bug with required Root CA certificate for the SSL, now it's optional. -- ๐ Fix: Bug with incorrectly consuming metrics collector from the RPC calls (thanks @dstrop). -- ๐ New: HTTP/FCGI/HTTPS internal logs instead of going to the raw stdout will be displayed in the RR logger at - the `Info` log level. -- โก New: Builds for the Mac with the M1 processor (arm64). -- ๐ท Rework ServeHTTP handler logic. Use http.Error instead of writing code directly to the response writer. Other small - improvements. +- ๐ Fix: Bug with required Root CA certificate for the SSL, now it's optional. +- ๐ Fix: Bug with incorrectly consuming metrics collector from the RPC calls (thanks @dstrop). +- ๐ New: HTTP/FCGI/HTTPS internal logs instead of going to the raw stdout will be displayed in the RR logger at + the `Info` log level. +- โก New: Builds for the Mac with the M1 processor (arm64). +- ๐ท Rework ServeHTTP handler logic. Use http.Error instead of writing code directly to the response writer. Other small + improvements. ## v2.0.1 (09.03.2021) -- ๐ Fix: incorrect PHP command validation -- ๐ Fix: ldflags properly inject RR version -- โฌ๏ธ Update: README, links to the go.pkg from v1 to v2 -- ๐ฆ Bump golang version in the Dockerfile and in the `go.mod` to 1.16 -- ๐ฆ Bump Endure container to v1.0.0. +- ๐ Fix: incorrect PHP command validation +- ๐ Fix: ldflags properly inject RR version +- โฌ๏ธ Update: README, links to the go.pkg from v1 to v2 +- ๐ฆ Bump golang version in the Dockerfile and in the `go.mod` to 1.16 +- ๐ฆ Bump Endure container to v1.0.0. ## v2.0.0 (02.03.2021) -- โ๏ธ Add a shared server to create PHP worker pools instead of isolated worker pool in each individual plugin. -- ๐ New plugin system with auto-recovery, easier plugin API. -- ๐ New `logger` plugin to configure logging for each plugin individually. -- ๐ Up to 50% performance increase in HTTP workloads. -- โ๏ธ Add **[Temporal Workflow](https://temporal.io)** plugin to run distributed computations on scale. -- โ๏ธ Add `debug` flag to reload PHP worker ahead of a request (emulates PHP-FPM behavior). -- โ Eliminate `limit` service, now each worker pool includes `supervisor` configuration. -- ๐ New resetter, informer plugins to perform hot reloads and observe loggers in a system. -- ๐ซ Expose more HTTP plugin configuration options. -- ๐ Headers, static and gzip services now located in HTTP config. -- ๐ Ability to configure the middleware sequence. -- ๐ฃ Faster Goridge protocol (eliminated 50% of syscalls). -- ๐พ Add support for binary payloads for RPC (`msgpack`). -- ๐ Server no longer stops when a PHP worker dies (attempts to restart). -- ๐พ New RR binary server downloader. -- ๐ฃ Echoing no longer breaks execution (yay!). -- ๐ Migration to ZapLogger instead of Logrus. -- ๐ฅ RR can no longer stuck when studding down with broken tasks in a pipeline. -- ๐งช More tests, more static analysis. -- ๐ฅ Create a new foundation for new KV, WebSocket, GRPC and Queue plugins. +- โ๏ธ Add a shared server to create PHP worker pools instead of isolated worker pool in each individual plugin. +- ๐ New plugin system with auto-recovery, easier plugin API. +- ๐ New `logger` plugin to configure logging for each plugin individually. +- ๐ Up to 50% performance increase in HTTP workloads. +- โ๏ธ Add **[Temporal Workflow](https://temporal.io)** plugin to run distributed computations on scale. +- โ๏ธ Add `debug` flag to reload PHP worker ahead of a request (emulates PHP-FPM behavior). +- โ Eliminate `limit` service, now each worker pool includes `supervisor` configuration. +- ๐ New resetter, informer plugins to perform hot reloads and observe loggers in a system. +- ๐ซ Expose more HTTP plugin configuration options. +- ๐ Headers, static and gzip services now located in HTTP config. +- ๐ Ability to configure the middleware sequence. +- ๐ฃ Faster Goridge protocol (eliminated 50% of syscalls). +- ๐พ Add support for binary payloads for RPC (`msgpack`). +- ๐ Server no longer stops when a PHP worker dies (attempts to restart). +- ๐พ New RR binary server downloader. +- ๐ฃ Echoing no longer breaks execution (yay!). +- ๐ Migration to ZapLogger instead of Logrus. +- ๐ฅ RR can no longer stuck when studding down with broken tasks in a pipeline. +- ๐งช More tests, more static analysis. +- ๐ฅ Create a new foundation for new KV, WebSocket, GRPC and Queue plugins. ## v2.0.0-RC.4 (20.02.2021) -- PHP tests use latest signatures (https://github.com/spiral/roadrunner/pull/550). -- Endure container update to v1.0.0-RC.2 version. -- Remove unneeded mutex from the `http.Workers` method. -- Rename `checker` plugin package to `status`, remove `/v1` endpoint prefix (#557). -- Add static, headers, status, gzip plugins to the `main.go`. -- Fix workers pool behavior -> idle_ttl, ttl, max_memory are soft errors and exec_ttl is hard error. +- PHP tests use latest signatures (https://github.com/spiral/roadrunner/pull/550). +- Endure container update to v1.0.0-RC.2 version. +- Remove unneeded mutex from the `http.Workers` method. +- Rename `checker` plugin package to `status`, remove `/v1` endpoint prefix (#557). +- Add static, headers, status, gzip plugins to the `main.go`. +- Fix workers pool behavior -> idle_ttl, ttl, max_memory are soft errors and exec_ttl is hard error. ## v2.0.0-RC.3 (17.02.2021) -- Add support for the overwriting `.rr.yaml` keys with values (ref: https://roadrunner.dev/docs/intro-config) -- Make logger plugin optional to define in the config. Default values: level -> `debug`, mode -> `development` -- Add the ability to read env variables from the `.rr.yaml` in the form of: `rpc.listen: {RPC_ADDR}`. Reference: - ref: https://roadrunner.dev/docs/intro-config (Environment Variables paragraph) +- Add support for the overwriting `.rr.yaml` keys with values (ref: https://roadrunner.dev/docs/intro-config) +- Make logger plugin optional to define in the config. Default values: level -> `debug`, mode -> `development` +- Add the ability to read env variables from the `.rr.yaml` in the form of: `rpc.listen: {RPC_ADDR}`. Reference: + ref: https://roadrunner.dev/docs/intro-config (Environment Variables paragraph) ## v2.0.0-RC.2 (11.02.2021) -- Update RR to version v2.0.0-RC.2 -- Update Temporal plugin to version v2.0.0-RC.1 -- Update Goridge to version v3.0.1 -- Update Endure to version v1.0.0-RC.1 +- Update RR to version v2.0.0-RC.2 +- Update Temporal plugin to version v2.0.0-RC.1 +- Update Goridge to version v3.0.1 +- Update Endure to version v1.0.0-RC.1 @@ -14,6 +14,7 @@ test_coverage: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.out -covermode=atomic ./bst go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.out -covermode=atomic ./priority_queue go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/events.out -covermode=atomic ./events echo 'mode: atomic' > ./coverage-ci/summary.txt tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt @@ -25,3 +26,4 @@ test: ## Run application tests go test -v -race -tags=debug ./worker_watcher go test -v -race -tags=debug ./bst go test -v -race -tags=debug ./priority_queue + go test -v -race -tags=debug ./events diff --git a/events/docs/events.md b/events/docs/events.md new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/events/docs/events.md diff --git a/events/events.go b/events/events.go new file mode 100644 index 00000000..b7396653 --- /dev/null +++ b/events/events.go @@ -0,0 +1,143 @@ +package events + +type EventType uint32 + +const ( + // EventUnaryCallOk represents success unary call response + EventUnaryCallOk EventType = iota + + // EventUnaryCallErr raised when unary call ended with error + EventUnaryCallErr + + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipePaused when pipeline has been paused. + EventPipePaused + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventDriverReady thrown when broken is ready to accept/serve tasks. + EventDriverReady + + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventWorkerProcessExit triggered on process wait exit + EventWorkerProcessExit + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL + + // EventPoolRestart triggered when pool restart is needed + EventPoolRestart + + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog + // EventWorkerStderr is the worker standard error output + EventWorkerStderr + // EventWorkerWaitExit is the worker exit event + EventWorkerWaitExit +) + +func (et EventType) String() string { + switch et { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventDriverReady: + return "EventDriverReady" + case EventPipePaused: + return "EventPipePaused" + + case EventUnaryCallOk: + return "EventUnaryCallOk" + case EventUnaryCallErr: + return "EventUnaryCallErr" + + case EventWorkerProcessExit: + return "EventWorkerProcessExit" + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + case EventPoolRestart: + return "EventPoolRestart" + + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + case EventWorkerStderr: + return "EventWorkerStderr" + case EventWorkerWaitExit: + return "EventWorkerWaitExit" + + default: + return "UnknownEventType" + } +} diff --git a/events/events_test.go b/events/events_test.go new file mode 100644 index 00000000..e15c55d6 --- /dev/null +++ b/events/events_test.go @@ -0,0 +1,94 @@ +package events + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEvenHandler(t *testing.T) { + eh, id := Bus() + defer eh.Unsubscribe(id) + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "http.EventJobOK", ch) + require.NoError(t, err) + + eh.Send(NewEvent(EventJobOK, "http", "foo")) + + evt := <-ch + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) +} + +func TestEvenHandler2(t *testing.T) { + eh, id := Bus() + eh2, id2 := Bus() + defer eh.Unsubscribe(id) + defer eh2.Unsubscribe(id2) + + ch := make(chan Event, 100) + ch2 := make(chan Event, 100) + err := eh2.SubscribeP(id2, "http.EventJobOK", ch) + require.NoError(t, err) + + err = eh.SubscribeP(id, "http.EventJobOK", ch2) + require.NoError(t, err) + + eh.Send(NewEvent(EventJobOK, "http", "foo")) + + evt := <-ch2 + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) + + l := eh.Len() + require.Equal(t, uint(2), l) + + eh.Unsubscribe(id) + time.Sleep(time.Second) + + l = eh.Len() + require.Equal(t, uint(1), l) + + eh2.Unsubscribe(id2) + time.Sleep(time.Second) + + l = eh.Len() + require.Equal(t, uint(0), l) +} + +func TestEvenHandler3(t *testing.T) { + eh, id := Bus() + defer eh.Unsubscribe(id) + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "EventJobOK", ch) + require.Error(t, err) +} + +func TestEvenHandler4(t *testing.T) { + eh, id := Bus() + defer eh.Unsubscribe(id) + + err := eh.SubscribeP(id, "EventJobOK", nil) + require.Error(t, err) +} + +func TestEvenHandler5(t *testing.T) { + eh, id := Bus() + defer eh.Unsubscribe(id) + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "http.EventJobOK", ch) + require.NoError(t, err) + + eh.Send(NewEvent(EventJobOK, "http", "foo")) + + evt := <-ch + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) +} diff --git a/events/eventsbus.go b/events/eventsbus.go new file mode 100644 index 00000000..cd0dca71 --- /dev/null +++ b/events/eventsbus.go @@ -0,0 +1,170 @@ +package events + +import ( + "fmt" + "strings" + "sync" + + "github.com/spiral/errors" +) + +type sub struct { + pattern string + w *wildcard + events chan<- Event +} + +type eventsBus struct { + sync.RWMutex + subscribers sync.Map + internalEvCh chan Event + stop chan struct{} +} + +func newEventsBus() *eventsBus { + return &eventsBus{ + internalEvCh: make(chan Event, 100), + stop: make(chan struct{}), + } +} + +/* +http.* <- +*/ + +// SubscribeAll for all RR events +// returns subscriptionID +func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + + if subIDTr == "" { + return errors.Str("subscriberID can't be empty") + } + + return eb.subscribe(subID, "*", ch) +} + +// SubscribeP pattern like "pluginName.EventType" +func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + patternTr := strings.Trim(pattern, " ") + + if subIDTr == "" || patternTr == "" { + return errors.Str("subscriberID or pattern can't be empty") + } + + return eb.subscribe(subID, pattern, ch) +} + +func (eb *eventsBus) Unsubscribe(subID string) { + eb.subscribers.Delete(subID) +} + +func (eb *eventsBus) UnsubscribeP(subID, pattern string) { + if sb, ok := eb.subscribers.Load(subID); ok { + eb.Lock() + defer eb.Unlock() + + sbArr := sb.([]*sub) + + for i := 0; i < len(sbArr); i++ { + if sbArr[i].pattern == pattern { + sbArr[i] = sbArr[len(sbArr)-1] + sbArr = sbArr[:len(sbArr)-1] + // replace with new array + eb.subscribers.Store(subID, sbArr) + return + } + } + } +} + +// Send sends event to the events bus +func (eb *eventsBus) Send(ev Event) { + // do not accept nil events + if ev == nil { + return + } + + eb.internalEvCh <- ev +} + +func (eb *eventsBus) Len() uint { + var ln uint + + eb.subscribers.Range(func(key, value interface{}) bool { + ln++ + return true + }) + + return ln +} + +func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error { + eb.Lock() + defer eb.Unlock() + w, err := newWildcard(pattern) + if err != nil { + return err + } + + if sb, ok := eb.subscribers.Load(subID); ok { + // at this point we are confident that sb is a []*sub type + subArr := sb.([]*sub) + subArr = append(subArr, &sub{ + pattern: pattern, + w: w, + events: ch, + }) + + eb.subscribers.Store(subID, subArr) + + return nil + } + + subArr := make([]*sub, 0, 5) + subArr = append(subArr, &sub{ + pattern: pattern, + w: w, + events: ch, + }) + + eb.subscribers.Store(subID, subArr) + + return nil +} + +func (eb *eventsBus) handleEvents() { + for { //nolint:gosimple + select { + case ev := <-eb.internalEvCh: + // http.WorkerError for example + wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String()) + + eb.subscribers.Range(func(key, value interface{}) bool { + vsub := value.([]*sub) + + for i := 0; i < len(vsub); i++ { + if vsub[i].w.match(wc) { + select { + case vsub[i].events <- ev: + return true + default: + return true + } + } + } + + return true + }) + } + } +} diff --git a/events/general.go b/events/general.go deleted file mode 100755 index 5cf13e10..00000000 --- a/events/general.go +++ /dev/null @@ -1,41 +0,0 @@ -package events - -import ( - "sync" -) - -const UnknownEventType string = "Unknown event type" - -// HandlerImpl helps to broadcast events to multiple listeners. -type HandlerImpl struct { - listeners []Listener - sync.RWMutex // all receivers should be pointers -} - -func NewEventsHandler() Handler { - return &HandlerImpl{listeners: make([]Listener, 0, 2)} -} - -// NumListeners returns number of event listeners. -func (eb *HandlerImpl) NumListeners() int { - eb.Lock() - defer eb.Unlock() - return len(eb.listeners) -} - -// AddListener registers new event listener. -func (eb *HandlerImpl) AddListener(listener Listener) { - eb.Lock() - defer eb.Unlock() - eb.listeners = append(eb.listeners, listener) -} - -// Push broadcast events across all event listeners. -func (eb *HandlerImpl) Push(e interface{}) { - // ReadLock here because we are not changing listeners - eb.RLock() - defer eb.RUnlock() - for k := range eb.listeners { - eb.listeners[k](e) - } -} diff --git a/events/grpc_event.go b/events/grpc_event.go deleted file mode 100644 index 31ff4957..00000000 --- a/events/grpc_event.go +++ /dev/null @@ -1,39 +0,0 @@ -package events - -import ( - "time" - - "google.golang.org/grpc" -) - -const ( - // EventUnaryCallOk represents success unary call response - EventUnaryCallOk G = iota + 13000 - - // EventUnaryCallErr raised when unary call ended with error - EventUnaryCallErr -) - -type G int64 - -func (ev G) String() string { - switch ev { - case EventUnaryCallOk: - return "EventUnaryCallOk" - case EventUnaryCallErr: - return "EventUnaryCallErr" - } - return UnknownEventType -} - -// JobEvent represent job event. -type GRPCEvent struct { - Event G - // Info contains unary call info. - Info *grpc.UnaryServerInfo - // Error associated with event. - Error error - // event timings - Start time.Time - Elapsed time.Duration -} diff --git a/events/init.go b/events/init.go new file mode 100644 index 00000000..25e92fc5 --- /dev/null +++ b/events/init.go @@ -0,0 +1,20 @@ +package events + +import ( + "sync" + + "github.com/google/uuid" +) + +var evBus *eventsBus +var onInit = &sync.Once{} + +func Bus() (*eventsBus, string) { + onInit.Do(func() { + evBus = newEventsBus() + go evBus.handleEvents() + }) + + // return events bus with subscriberID + return evBus, uuid.NewString() +} diff --git a/events/interface.go b/events/interface.go deleted file mode 100644 index 7d57e4d0..00000000 --- a/events/interface.go +++ /dev/null @@ -1,14 +0,0 @@ -package events - -// Handler interface -type Handler interface { - // NumListeners return number of active listeners - NumListeners() int - // AddListener adds lister to the publisher - AddListener(listener Listener) - // Push pushes event to the listeners - Push(e interface{}) -} - -// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service. -type Listener func(event interface{}) diff --git a/events/jobs_events.go b/events/jobs_events.go deleted file mode 100644 index f65ede67..00000000 --- a/events/jobs_events.go +++ /dev/null @@ -1,81 +0,0 @@ -package events - -import ( - "time" -) - -const ( - // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK J = iota + 12000 - - // EventPushError caused when job can not be registered. - EventPushError - - // EventJobStart thrown when new job received. - EventJobStart - - // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. - EventJobOK - - // EventJobError thrown on all job related errors. See JobError as context. - EventJobError - - // EventPipeActive when pipeline has started. - EventPipeActive - - // EventPipeStopped when pipeline has been stopped. - EventPipeStopped - - // EventPipePaused when pipeline has been paused. - EventPipePaused - - // EventPipeError when pipeline specific error happen. - EventPipeError - - // EventDriverReady thrown when broken is ready to accept/serve tasks. - EventDriverReady -) - -type J int64 - -func (ev J) String() string { - switch ev { - case EventPushOK: - return "EventPushOK" - case EventPushError: - return "EventPushError" - case EventJobStart: - return "EventJobStart" - case EventJobOK: - return "EventJobOK" - case EventJobError: - return "EventJobError" - case EventPipeActive: - return "EventPipeActive" - case EventPipeStopped: - return "EventPipeStopped" - case EventPipeError: - return "EventPipeError" - case EventDriverReady: - return "EventDriverReady" - case EventPipePaused: - return "EventPipePaused" - } - return UnknownEventType -} - -// JobEvent represent job event. -type JobEvent struct { - Event J - // String is job id. - ID string - // Pipeline name - Pipeline string - // Associated driver name (amqp, ephemeral, etc) - Driver string - // Error for the jobs/pipes errors - Error error - // event timings - Start time.Time - Elapsed time.Duration -} diff --git a/events/pool_events.go b/events/pool_events.go deleted file mode 100644 index eb28df6a..00000000 --- a/events/pool_events.go +++ /dev/null @@ -1,71 +0,0 @@ -package events - -const ( - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct P = iota + 10000 - - // EventWorkerDestruct thrown after worker destruction. - EventWorkerDestruct - - // EventSupervisorError triggered when supervisor can not complete work. - EventSupervisorError - - // EventWorkerProcessExit triggered on process wait exit - EventWorkerProcessExit - - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed - EventNoFreeWorkers - - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory - - // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) - EventTTL - - // EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL - - // EventPoolRestart triggered when pool restart is needed - EventPoolRestart -) - -type P int64 - -func (ev P) String() string { - switch ev { - case EventWorkerProcessExit: - return "EventWorkerProcessExit" - case EventWorkerConstruct: - return "EventWorkerConstruct" - case EventWorkerDestruct: - return "EventWorkerDestruct" - case EventSupervisorError: - return "EventSupervisorError" - case EventNoFreeWorkers: - return "EventNoFreeWorkers" - case EventMaxMemory: - return "EventMaxMemory" - case EventTTL: - return "EventTTL" - case EventIdleTTL: - return "EventIdleTTL" - case EventExecTTL: - return "EventExecTTL" - case EventPoolRestart: - return "EventPoolRestart" - } - return UnknownEventType -} - -// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. -type PoolEvent struct { - // Event type, see below. - Event P - - // Payload depends on event type, typically it's worker or error. - Payload interface{} - Error error -} diff --git a/events/types.go b/events/types.go new file mode 100644 index 00000000..65a76d15 --- /dev/null +++ b/events/types.go @@ -0,0 +1,46 @@ +package events + +type EventBus interface { + SubscribeAll(subID string, ch chan<- Event) error + SubscribeP(subID string, pattern string, ch chan<- Event) error + Unsubscribe(subID string) + UnsubscribeP(subID, pattern string) + Len() uint + Send(ev Event) +} + +type Event interface { + Plugin() string + Type() EventType + Message() string +} + +type RREvent struct { + // event typ + typ EventType + // plugin + plugin string + // message + message string +} + +// NewEvent initializes new event +func NewEvent(t EventType, plugin string, msg string) *RREvent { + return &RREvent{ + typ: t, + plugin: plugin, + message: msg, + } +} + +func (r *RREvent) Type() EventType { + return r.typ +} + +func (r *RREvent) Message() string { + return r.message +} + +func (r *RREvent) Plugin() string { + return r.plugin +} diff --git a/events/wildcard.go b/events/wildcard.go new file mode 100644 index 00000000..b4c28ae1 --- /dev/null +++ b/events/wildcard.go @@ -0,0 +1,43 @@ +package events + +import ( + "strings" + + "github.com/spiral/errors" +) + +type wildcard struct { + prefix string + suffix string +} + +func newWildcard(pattern string) (*wildcard, error) { + // Normalize + origin := strings.ToLower(pattern) + i := strings.IndexByte(origin, '*') + + /* + http.* + * + *.WorkerError + */ + if i == -1 { + dotI := strings.IndexByte(pattern, '.') + + if dotI == -1 { + // http.SuperEvent + return nil, errors.Str("wrong wildcard, no * or . Usage: http.Event or *.Event or http.*") + } + + return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil + } + + // pref: http. + // suff: * + return &wildcard{origin[0:i], origin[i+1:]}, nil +} + +func (w wildcard) match(s string) bool { + s = strings.ToLower(s) + return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix) +} diff --git a/events/wildcard_test.go b/events/wildcard_test.go new file mode 100644 index 00000000..230ef673 --- /dev/null +++ b/events/wildcard_test.go @@ -0,0 +1,48 @@ +package events + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWildcard(t *testing.T) { + w, err := newWildcard("http.*") + assert.NoError(t, err) + assert.True(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.True(t, w.match("http.****")) + + // *.* -> * + w, err = newWildcard("*") + assert.NoError(t, err) + assert.True(t, w.match("http.SuperEvent")) + assert.True(t, w.match("https.SuperEvent")) + assert.True(t, w.match("")) + assert.True(t, w.match("*")) + assert.True(t, w.match("****")) + assert.True(t, w.match("http.****")) + + w, err = newWildcard("*.WorkerError") + assert.NoError(t, err) + assert.False(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.False(t, w.match("http.****")) + assert.True(t, w.match("http.WorkerError")) + + w, err = newWildcard("http.WorkerError") + assert.NoError(t, err) + assert.False(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.False(t, w.match("http.****")) + assert.True(t, w.match("http.WorkerError")) +} diff --git a/events/worker_events.go b/events/worker_events.go deleted file mode 100644 index 6b80df61..00000000 --- a/events/worker_events.go +++ /dev/null @@ -1,40 +0,0 @@ -package events - -const ( - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError W = iota + 11000 - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. - EventWorkerLog - // EventWorkerStderr is the worker standard error output - EventWorkerStderr - // EventWorkerWaitExit is the worker exit event - EventWorkerWaitExit -) - -type W int64 - -func (ev W) String() string { - switch ev { - case EventWorkerError: - return "EventWorkerError" - case EventWorkerLog: - return "EventWorkerLog" - case EventWorkerStderr: - return "EventWorkerStderr" - case EventWorkerWaitExit: - return "EventWorkerWaitExit" - } - return UnknownEventType -} - -// WorkerEvent wraps worker events. -type WorkerEvent struct { - // Event id, see below. - Event W - - // Worker triggered the event. - Worker interface{} - - // Event specific payload. - Payload interface{} -} diff --git a/pool/static_pool.go b/pool/static_pool.go index 91bd1c2c..11112e72 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "os/exec" "time" @@ -14,8 +15,12 @@ import ( workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" ) -// StopRequest can be sent by worker to indicate that restart is required. -const StopRequest = "{\"stop\":true}" +const ( + // StopRequest can be sent by worker to indicate that restart is required. + StopRequest = `{"stop":true}` + // pluginName ... + pluginName = "pool" +) // ErrorEncoder encode error or make a decision based on the error type type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) @@ -34,11 +39,8 @@ type StaticPool struct { // creates and connects to stack factory transport.Factory - // distributes the events - events events.Handler - - // saved list of event listeners - listeners []events.Listener + events events.EventBus + eventsID string // manages worker states and TTLs ww Watcher @@ -62,11 +64,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg cfg.MaxJobs = 1 } + eb, id := events.Bus() p := &StaticPool{ - cfg: cfg, - cmd: cmd, - factory: factory, - events: events.NewEventsHandler(), + cfg: cfg, + cmd: cmd, + factory: factory, + events: eb, + eventsID: id, } // add pool options @@ -77,7 +81,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout) // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) @@ -95,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { - sp := supervisorWrapper(p, p.events, p.cfg.Supervisor) + sp := supervisorWrapper(p, eb, p.cfg.Supervisor) // start watcher timer sp.Start() return sp, nil @@ -104,20 +108,6 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg return p, nil } -func AddListeners(listeners ...events.Listener) Options { - return func(p *StaticPool) { - p.listeners = listeners - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - -// AddListener connects event listener to the pool. -func (sp *StaticPool) addListener(listener events.Listener) { - sp.events.AddListener(listener) -} - // GetConfig returns associated pool configuration. Immutable. func (sp *StaticPool) GetConfig() interface{} { return sp.cfg @@ -205,7 +195,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()))) } } @@ -227,7 +217,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Error: errors.E(op, err)}) + sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("error: %s", err))) return nil, errors.E(op, err) } // else if err not nil - return error @@ -238,6 +228,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work // Destroy all underlying stack (but let them complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { + sp.events.Unsubscribe(sp.eventsID) sp.ww.Destroy(ctx) } @@ -246,12 +237,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err}) + sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("error: %s", err))) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -272,7 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // kill the worker instead of sending net packet to it _ = w.Kill() @@ -280,7 +271,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return nil, err default: w.State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { @@ -296,7 +287,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return func() (worker.SyncWorker, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) defer cancel() - w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...) + w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd()) if err != nil { return nil, err } @@ -304,10 +295,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // wrap sync worker sw := worker.From(w) - sp.events.Push(events.PoolEvent{ - Event: events.EventWorkerConstruct, - Payload: sw, - }) + sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("pid: %d", sw.Pid()))) return sw, nil } } @@ -329,7 +317,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { sw.State().Set(worker.StateDestroyed) err = sw.Kill() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) return nil, err } @@ -346,7 +334,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // redirect call to the worker with TTL r, err := sw.ExecWithTTL(ctx, p) if stopErr := sw.Stop(); stopErr != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) } return r, err diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index 9861f0d8..717d301e 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -18,6 +18,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var cfg = &Config{ @@ -167,26 +168,18 @@ func Test_StaticPool_JobError(t *testing.T) { func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } + + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") }, pipe.NewPipeFactory(), cfg, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -196,22 +189,23 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + event := <-ch + if !strings.Contains(event.Message(), "undefined_function()") { + t.Fatal("event should contain undefiled function()") + } p.Destroy(ctx) } func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() + // Run pool events - ev := make(chan struct{}, 1) - listener := func(event interface{}) { - if pe, ok := event.(events.PoolEvent); ok { - if pe.Event == events.EventWorkerConstruct { - ev <- struct{}{} - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch) + require.NoError(t, err) var cfg2 = &Config{ NumWorkers: 1, @@ -224,7 +218,6 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), cfg2, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -242,7 +235,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.Equal(t, 1, len(p.Workers())) // first creation - <-ev + <-ch // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill() if err != nil { @@ -250,7 +243,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } // re-creation - <-ev + <-ch list := p.Workers() for _, w := range list { @@ -496,15 +489,12 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventNoFreeWorkers { - block <- struct{}{} - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch) + require.NoError(t, err) p, err := Initialize( ctx, @@ -518,7 +508,6 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { DestroyTimeout: time.Second, Supervisor: nil, }, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -532,7 +521,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + <-ch p.Destroy(ctx) } diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index 99af168c..1a94f6a0 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "sync" "time" @@ -12,7 +13,10 @@ import ( "github.com/spiral/roadrunner/v2/worker" ) -const MB = 1024 * 1024 +const ( + MB = 1024 * 1024 + supervisorName string = "supervisor" +) // NSEC_IN_SEC nanoseconds in second const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck @@ -25,16 +29,16 @@ type Supervised interface { type supervised struct { cfg *SupervisorConfig - events events.Handler + events events.EventBus pool Pool stopCh chan struct{} mu *sync.RWMutex } -func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { +func supervisorWrapper(pool Pool, eb events.EventBus, cfg *SupervisorConfig) Supervised { sp := &supervised{ cfg: cfg, - events: events, + events: eb, pool: pool, mu: &sync.RWMutex{}, stopCh: make(chan struct{}), @@ -148,7 +152,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + sp.events.Send(events.NewEvent(events.EventTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) continue } @@ -168,7 +172,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + sp.events.Send(events.NewEvent(events.EventMaxMemory, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) continue } @@ -223,7 +227,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double-check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + sp.events.Send(events.NewEvent(events.EventIdleTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index aca379c6..eb3c37dd 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -265,6 +265,11 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { assert.Empty(t, resp.Context) time.Sleep(time.Second * 2) + + if len(p.Workers()) < 1 { + t.Fatal("should be at least 1 worker") + return + } // should be destroyed, state should be Ready, not Invalid assert.NotEqual(t, pid, p.Workers()[0].Pid()) assert.Equal(t, int64(1), p.Workers()[0].State().Value()) @@ -326,14 +331,11 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { }, } - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventMaxMemory { - block <- struct{}{} - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch) + require.NoError(t, err) // constructed // max memory @@ -344,7 +346,6 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, pipe.NewPipeFactory(), cfgExecTTL, - AddListeners(listener), ) assert.NoError(t, err) @@ -359,7 +360,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.Empty(t, resp.Body) assert.Empty(t, resp.Context) - <-block + <-ch p.Destroy(context.Background()) } diff --git a/tests/worker-ok.php b/tests/psr-worker-post.php index 63558b0f..2f54af5b 100644 --- a/tests/worker-ok.php +++ b/tests/psr-worker-post.php @@ -1,14 +1,16 @@ <?php + /** * @var Goridge\RelayInterface $relay */ + use Spiral\Goridge; use Spiral\RoadRunner; ini_set('display_errors', 'stderr'); require __DIR__ . "/vendor/autoload.php"; -$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); +$worker = RoadRunner\Worker::create(); $psr7 = new RoadRunner\Http\PSR7Worker( $worker, new \Nyholm\Psr7\Factory\Psr17Factory(), @@ -19,7 +21,8 @@ $psr7 = new RoadRunner\Http\PSR7Worker( while ($req = $psr7->waitRequest()) { try { $resp = new \Nyholm\Psr7\Response(); - $resp->getBody()->write($_SERVER['RR_BROADCAST_PATH'] ?? ''); + $resp->getBody()->write((string) $req->getBody()); + $psr7->respond($resp); } catch (\Throwable $e) { $psr7->getWorker()->error((string)$e); diff --git a/tests/temporal-worker.php b/tests/temporal-worker.php deleted file mode 100644 index 5c9c80e6..00000000 --- a/tests/temporal-worker.php +++ /dev/null @@ -1,34 +0,0 @@ -<?php - -declare(strict_types=1); - -require __DIR__ . '/vendor/autoload.php'; - -/** - * @param string $dir - * @return array<string> - */ -$getClasses = static function (string $dir): iterable { - $files = glob($dir . '/*.php'); - - foreach ($files as $file) { - yield substr(basename($file), 0, -4); - } -}; - -$factory = \Temporal\WorkerFactory::create(); - -$worker = $factory->newWorker('default'); - -// register all workflows -foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) { - $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name); -} - -// register all activity -foreach ($getClasses(__DIR__ . '/src/Activity') as $name) { - $class = 'Temporal\\Tests\\Activity\\' . $name; - $worker->registerActivityImplementations(new $class); -} - -$factory->run(); diff --git a/tests/worker-cors.php b/tests/worker-cors.php deleted file mode 100644 index ea3c986c..00000000 --- a/tests/worker-cors.php +++ /dev/null @@ -1,15 +0,0 @@ -<?php - -use Spiral\RoadRunner\Worker; -use Spiral\RoadRunner\Http\HttpWorker; - -ini_set('display_errors', 'stderr'); -require __DIR__ . '/vendor/autoload.php'; - -$http = new HttpWorker(Worker::create()); - -while ($req = $http->waitRequest()) { - $http->respond(200, 'Response', [ - 'Access-Control-Allow-Origin' => ['*'] - ]); -} diff --git a/tests/worker-deny.php b/tests/worker-deny.php deleted file mode 100644 index 6dc993f6..00000000 --- a/tests/worker-deny.php +++ /dev/null @@ -1,30 +0,0 @@ -<?php -/** - * @var Goridge\RelayInterface $relay - */ -use Spiral\Goridge; -use Spiral\RoadRunner; - -ini_set('display_errors', 'stderr'); -require __DIR__ . "/vendor/autoload.php"; - -$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); -$psr7 = new RoadRunner\Http\PSR7Worker( - $worker, - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory() -); - -while ($req = $psr7->waitRequest()) { - try { - $resp = new \Nyholm\Psr7\Response(); - if ($req->getAttribute('ws:joinServer')) { - $psr7->respond($resp->withStatus(200)); - } else { - $psr7->respond($resp->withStatus(401)); - } - } catch (\Throwable $e) { - $psr7->getWorker()->error((string)$e); - } -} diff --git a/tests/worker-origin.php b/tests/worker-origin.php deleted file mode 100644 index 6ce4de59..00000000 --- a/tests/worker-origin.php +++ /dev/null @@ -1,14 +0,0 @@ -<?php - -use Spiral\RoadRunner\Worker; -use Spiral\RoadRunner\Http\HttpWorker; - -require __DIR__ . '/vendor/autoload.php'; - -$http = new HttpWorker(Worker::create()); - -while ($req = $http->waitRequest()) { - $http->respond(200, 'Response', [ - 'Access-Control-Allow-Origin' => ['*'] - ]); -} diff --git a/tests/worker-stop.php b/tests/worker-stop.php deleted file mode 100644 index 83fc5710..00000000 --- a/tests/worker-stop.php +++ /dev/null @@ -1,26 +0,0 @@ -<?php -/** - * @var Goridge\RelayInterface $relay - */ -use Spiral\Goridge; -use Spiral\RoadRunner; - -ini_set('display_errors', 'stderr'); -require __DIR__ . "/vendor/autoload.php"; - -$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); -$psr7 = new RoadRunner\Http\PSR7Worker( - $worker, - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory(), - new \Nyholm\Psr7\Factory\Psr17Factory() -); - -while ($req = $psr7->waitRequest()) { - try { - $resp = new \Nyholm\Psr7\Response(); - $psr7->respond($resp->withAddedHeader('stop', 'we-dont-like-you')->withStatus(401)); - } catch (\Throwable $e) { - $psr7->getWorker()->error((string)$e); - } -} diff --git a/transport/interface.go b/transport/interface.go index e20f2b0b..0d6c8e8b 100644 --- a/transport/interface.go +++ b/transport/interface.go @@ -4,7 +4,6 @@ import ( "context" "os/exec" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/worker" ) @@ -12,10 +11,10 @@ import ( type Factory interface { // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context. // Process must not be started. - SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error) + SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (*worker.Process, error) // SpawnWorker creates new WorkerProcess process based on given command. // Process must not be started. - SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error) + SpawnWorker(*exec.Cmd) (*worker.Process, error) // Close the factory and underlying connections. Close() error } diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go index 3ea8fd98..c70b3f65 100755 --- a/transport/pipe/pipe_factory.go +++ b/transport/pipe/pipe_factory.go @@ -5,7 +5,6 @@ import ( "os/exec" "github.com/spiral/goridge/v3/pkg/pipe" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" ) @@ -27,10 +26,10 @@ type sr struct { // SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { spCh := make(chan sr) go func() { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd) if err != nil { select { case spCh <- sr{ @@ -130,8 +129,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { + w, err := worker.InitBaseWorker(cmd) if err != nil { return nil, err } diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go index 45b7aef8..256176de 100644 --- a/transport/pipe/pipe_factory_spawn_test.go +++ b/transport/pipe/pipe_factory_spawn_test.go @@ -12,6 +12,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_GetState2(t *testing.T) { @@ -105,21 +106,21 @@ func Test_Pipe_PipeError4(t *testing.T) { func Test_Pipe_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } - w, err := NewPipeFactory().SpawnWorker(cmd, listener) + + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Pipe_Invalid2(t *testing.T) { @@ -368,17 +369,14 @@ func Test_Echo_Slow2(t *testing.T) { func Test_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - w, err := NewPipeFactory().SpawnWorker(cmd, listener) + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -390,11 +388,11 @@ func Test_Broken2(t *testing.T) { assert.Nil(t, res) time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { + + msg := <-ch + if strings.ContainsAny(msg.Message(), "undefined_function()") == false { t.Fail() } - mu.Unlock() assert.Error(t, w.Stop()) } diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index b4ba8c87..0f527cd5 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_GetState(t *testing.T) { @@ -125,22 +126,21 @@ func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") ctx := context.Background() - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Pipe_Invalid(t *testing.T) { @@ -433,17 +433,14 @@ func Test_Broken(t *testing.T) { t.Parallel() ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -455,11 +452,10 @@ func Test_Broken(t *testing.T) { assert.Nil(t, res) time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { + msg := <-ch + if strings.ContainsAny(msg.Message(), "undefined_function()") == false { t.Fail() } - mu.Unlock() assert.Error(t, w.Stop()) } diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go index dfffdf4e..06d7000d 100755 --- a/transport/socket/socket_factory.go +++ b/transport/socket/socket_factory.go @@ -12,7 +12,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/goridge/v3/pkg/socket" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" @@ -83,12 +82,12 @@ type socketSpawn struct { } // SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { c := make(chan socketSpawn) go func() { ctxT, cancel := context.WithTimeout(ctx, f.tout) defer cancel() - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd) if err != nil { select { case c <- socketSpawn{ @@ -157,8 +156,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { + w, err := worker.InitBaseWorker(cmd) if err != nil { return nil, err } diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go index 363a3510..2db2fd40 100644 --- a/transport/socket/socket_factory_spawn_test.go +++ b/transport/socket/socket_factory_spawn_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_Tcp_Start2(t *testing.T) { @@ -110,21 +111,20 @@ func Test_Tcp_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err2) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Tcp_Invalid2(t *testing.T) { @@ -162,18 +162,13 @@ func Test_Tcp_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -198,7 +193,11 @@ func Test_Tcp_Broken2(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function() string") + } } func Test_Tcp_Echo2(t *testing.T) { @@ -273,21 +272,20 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Unix_Timeout2(t *testing.T) { @@ -331,18 +329,13 @@ func Test_Unix_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -367,7 +360,11 @@ func Test_Unix_Broken2(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } } func Test_Unix_Echo2(t *testing.T) { diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go index d517d026..7b28a847 100755 --- a/transport/socket/socket_factory_test.go +++ b/transport/socket/socket_factory_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_Tcp_Start(t *testing.T) { @@ -124,21 +125,20 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err2) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Tcp_Timeout(t *testing.T) { @@ -203,18 +203,13 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -239,7 +234,11 @@ func Test_Tcp_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } } func Test_Tcp_Echo(t *testing.T) { @@ -368,21 +367,20 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Unix_Timeout(t *testing.T) { @@ -444,20 +442,13 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -481,7 +472,12 @@ func Test_Unix_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } + wg.Wait() } diff --git a/worker/worker.go b/worker/worker.go index 38a1e9ac..05c6dd0d 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -12,18 +12,24 @@ import ( "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/utils" "go.uber.org/multierr" ) type Options func(p *Process) +const ( + workerEventsName string = "worker" +) + // Process - supervised process with api over goridge.Relay. type Process struct { // created indicates at what time Process has been created. created time.Time // updates parent supervisor or pool about Process events - events events.Handler + events events.EventBus + eventsID string // state holds information about current Process state, // number of Process executions, buf status change time. @@ -49,11 +55,14 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } + + eb, id := events.Bus() w := &Process{ - created: time.Now(), - events: events.NewEventsHandler(), - cmd: cmd, - state: NewWorkerState(StateInactive), + created: time.Now(), + events: eb, + eventsID: id, + cmd: cmd, + state: NewWorkerState(StateInactive), } // set self as stderr implementation (Writer interface) @@ -67,14 +76,6 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { return w, nil } -func AddListeners(listeners ...events.Listener) Options { - return func(p *Process) { - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - // Pid returns worker pid. func (w *Process) Pid() int64 { return int64(w.pid) @@ -85,11 +86,6 @@ func (w *Process) Created() time.Time { return w.created } -// AddListener registers new worker event listener. -func (w *Process) addListener(listener events.Listener) { - w.events.AddListener(listener) -} - // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. func (w *Process) State() State { @@ -139,6 +135,7 @@ func (w *Process) Wait() error { const op = errors.Op("process_wait") var err error err = w.cmd.Wait() + defer w.events.Unsubscribe(w.eventsID) // If worker was destroyed, just exit if w.State().Value() == StateDestroyed { @@ -187,9 +184,13 @@ func (w *Process) Stop() error { if err != nil { w.state.Set(StateKilling) _ = w.cmd.Process.Signal(os.Kill) + + w.events.Unsubscribe(w.eventsID) return errors.E(op, errors.Network, err) } + w.state.Set(StateStopped) + w.events.Unsubscribe(w.eventsID) return nil } @@ -201,6 +202,8 @@ func (w *Process) Kill() error { if err != nil { return err } + + w.events.Unsubscribe(w.eventsID) return nil } @@ -210,11 +213,13 @@ func (w *Process) Kill() error { return err } w.state.Set(StateStopped) + + w.events.Unsubscribe(w.eventsID) return nil } // Worker stderr func (w *Process) Write(p []byte) (n int, err error) { - w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p}) + w.events.Send(events.NewEvent(events.EventWorkerStderr, workerEventsName, utils.AsString(p))) return len(p), nil } diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 175972e0..d425994e 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -2,6 +2,7 @@ package worker_watcher //nolint:stylecheck import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -13,6 +14,10 @@ import ( "github.com/spiral/roadrunner/v2/worker_watcher/container/channel" ) +const ( + wwName string = "worker_watcher" +) + // Vector interface represents vector container type Vector interface { // Push used to put worker to the vector @@ -34,25 +39,28 @@ type workerWatcher struct { // used to control Destroy stage (that all workers are in the container) numWorkers *uint64 - workers []worker.BaseProcess + workers []worker.BaseProcess + events events.EventBus + eventsID string allocator worker.Allocator allocateTimeout time.Duration - events events.Handler } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher { + eb, id := events.Bus() ww := &workerWatcher{ container: channel.NewVector(numWorkers), + events: eb, + eventsID: id, // pass a ptr to the number of workers to avoid blocking in the TTL loop numWorkers: utils.Uint64(numWorkers), allocateTimeout: allocateTimeout, workers: make([]worker.BaseProcess, 0, numWorkers), allocator: allocator, - events: events, } return ww @@ -140,11 +148,7 @@ func (ww *workerWatcher) Allocate() error { sw, err := ww.allocator() if err != nil { // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), - }) + ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err))) // if no timeout, return error immediately if ww.allocateTimeout == 0 { @@ -168,11 +172,7 @@ func (ww *workerWatcher) Allocate() error { sw, err = ww.allocator() if err != nil { // log incident - ww.events.Push( - events.WorkerEvent{ - Event: events.EventWorkerError, - Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), - }) + ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err))) continue } @@ -234,6 +234,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) { ww.container.Destroy() ww.Unlock() + ww.events.Unsubscribe(ww.eventsID) tt := time.NewTicker(time.Millisecond * 100) defer tt.Stop() for { //nolint:gosimple @@ -278,10 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { - ww.events.Push(events.WorkerEvent{ - Event: events.EventWorkerWaitExit, - Payload: err, - }) + ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("error: %v", err))) } // remove worker @@ -289,7 +287,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace - ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("pid: %d", w.Pid()))) + return } @@ -298,10 +297,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { err = ww.Allocate() if err != nil { - ww.events.Push(events.PoolEvent{ - Event: events.EventWorkerProcessExit, - Error: errors.E(op, err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("error: %v", err))) // no workers at all, panic if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { |