summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-27 22:50:03 +0300
committerGitHub <[email protected]>2021-10-27 22:50:03 +0300
commitc8c3f9f113eae13aa37cf92043b288bb0c68a622 (patch)
tree42f8ab386735d5f8b002907d07249e94b4c10a12
parent1f62e21020cc3014e9eb2dc33c154de6dd5b22d5 (diff)
parentab591e7f122e28857cef00c905a8125992ea3cdf (diff)
[#838]: feat(events): events package deep refactoringv2.6.0-alpha.1
[#838]: feat(events): events package deep refactoring
-rw-r--r--CHANGELOG.md352
-rw-r--r--Makefile2
-rw-r--r--events/docs/events.md0
-rw-r--r--events/events.go143
-rw-r--r--events/events_test.go94
-rw-r--r--events/eventsbus.go170
-rwxr-xr-xevents/general.go41
-rw-r--r--events/grpc_event.go39
-rw-r--r--events/init.go20
-rw-r--r--events/interface.go14
-rw-r--r--events/jobs_events.go81
-rw-r--r--events/pool_events.go71
-rw-r--r--events/types.go46
-rw-r--r--events/wildcard.go43
-rw-r--r--events/wildcard_test.go48
-rw-r--r--events/worker_events.go40
-rwxr-xr-xpool/static_pool.go68
-rwxr-xr-xpool/static_pool_test.go61
-rwxr-xr-xpool/supervisor_pool.go18
-rw-r--r--pool/supervisor_test.go21
-rw-r--r--tests/psr-worker-post.php (renamed from tests/worker-ok.php)7
-rw-r--r--tests/temporal-worker.php34
-rw-r--r--tests/worker-cors.php15
-rw-r--r--tests/worker-deny.php30
-rw-r--r--tests/worker-origin.php14
-rw-r--r--tests/worker-stop.php26
-rw-r--r--transport/interface.go5
-rwxr-xr-xtransport/pipe/pipe_factory.go9
-rw-r--r--transport/pipe/pipe_factory_spawn_test.go48
-rwxr-xr-xtransport/pipe/pipe_factory_test.go46
-rwxr-xr-xtransport/socket/socket_factory.go9
-rw-r--r--transport/socket/socket_factory_spawn_test.go93
-rwxr-xr-xtransport/socket/socket_factory_test.go96
-rwxr-xr-xworker/worker.go43
-rwxr-xr-xworker_watcher/worker_watcher.go42
35 files changed, 1025 insertions, 864 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 380f4874..47bc475f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,13 @@
# CHANGELOG
-## v2.5.0 (20.10.2021)
+# v2.6.0 (-.-.2021)
+
+### ๐Ÿ‘€ New:
+
+- โœ๏ธ New internal message bus. Available globally. Supports wildcard subscriptions (for example: `http.*` will subscribe you to the all events coming from the `http` plugin). The subscriptions can be made from any RR plugin to any RR plugin.
+
+
+# v2.5.0 (20.10.2021)
# ๐Ÿ’” Breaking change:
@@ -12,9 +19,9 @@
```yaml
broadcast:
- default:
- driver: memory
- interval: 1
+ default:
+ driver: memory
+ interval: 1
```
### New style:
@@ -23,7 +30,7 @@ broadcast:
broadcast:
default:
driver: memory
- config: {} <--------------- NEW
+ config: { } <--------------- NEW
```
```yaml
@@ -37,8 +44,8 @@ kv:
memcached-rr:
driver: memcached
config: <--------------- NEW
- addr:
- - "127.0.0.1:11211"
+ addr:
+ - "127.0.0.1:11211"
broadcast:
default:
@@ -51,8 +58,11 @@ broadcast:
## ๐Ÿ‘€ New:
- โœ๏ธ **[BETA]** GRPC plugin update to v2.
-- โœ๏ธ [Roadrunner-plugins](https://github.com/spiral/roadrunner-plugins) repository. This is the new home for the roadrunner plugins with documentation, configuration samples, and common problems.
-- โœ๏ธ **[BETA]** Let's Encrypt support. RR now can obtain an SSL certificate/PK for your domain automatically. Here is the new configuration:
+- โœ๏ธ [Roadrunner-plugins](https://github.com/spiral/roadrunner-plugins) repository. This is the new home for the
+ roadrunner plugins with documentation, configuration samples, and common problems.
+- โœ๏ธ **[BETA]** Let's Encrypt support. RR now can obtain an SSL certificate/PK for your domain automatically. Here is
+ the new configuration:
+
```yaml
ssl:
# Host and port to listen on (eg.: `127.0.0.1:443`).
@@ -105,23 +115,25 @@ broadcast:
- โœ๏ธ Add a new option to the `logs` plugin to configure the line ending. By default, used `\n`.
**New option**:
+
```yaml
# Logs plugin settings
logs:
- (....)
- # Line ending
- #
- # Default: "\n".
- line_ending: "\n"
+ (....)
+ # Line ending
+ #
+ # Default: "\n".
+ line_ending: "\n"
```
- โœ๏ธ HTTP [Access log support](https://github.com/spiral/roadrunner-plugins/issues/34) at the `Info` log level.
+
```yaml
http:
address: 127.0.0.1:55555
max_request_size: 1024
access_logs: true <-------- Access Logs ON/OFF
- middleware: []
+ middleware: [ ]
pool:
num_workers: 2
@@ -129,13 +141,16 @@ http:
allocate_timeout: 60s
destroy_timeout: 60s
```
-- โœ๏ธ HTTP middleware to handle `X-Sendfile` [header](https://github.com/spiral/roadrunner-plugins/issues/9).
- Middleware reads the file in 10MB chunks. So, for example for the 5Gb file, only 10MB of RSS will be used. If the file size is smaller than 10MB, the middleware fits the buffer to the file size.
+
+- โœ๏ธ HTTP middleware to handle `X-Sendfile` [header](https://github.com/spiral/roadrunner-plugins/issues/9). Middleware
+ reads the file in 10MB chunks. So, for example for the 5Gb file, only 10MB of RSS will be used. If the file size is
+ smaller than 10MB, the middleware fits the buffer to the file size.
+
```yaml
http:
address: 127.0.0.1:44444
max_request_size: 1024
- middleware: ["sendfile"] <----- NEW MIDDLEWARE
+ middleware: [ "sendfile" ] <----- NEW MIDDLEWARE
pool:
num_workers: 2
@@ -145,6 +160,7 @@ http:
```
- โœ๏ธ Service plugin now supports env variables passing to the script/executable/binary/any like in the `server` plugin:
+
```yaml
service:
some_service_1:
@@ -152,21 +168,25 @@ service:
process_num: 1
exec_timeout: 5s # s,m,h (seconds, minutes, hours)
remain_after_exit: true
- env: <----------------- NEW
+ env: <----------------- NEW
foo: "BAR"
restart_sec: 1
```
- โœ๏ธ Server plugin can accept scripts (sh, bash, etc) in it's `command` configuration key:
+
```yaml
server:
- command: "./script.sh OR sh script.sh" <--- UPDATED
- relay: "pipes"
- relay_timeout: "20s"
+ command: "./script.sh OR sh script.sh" <--- UPDATED
+ relay: "pipes"
+ relay_timeout: "20s"
```
-The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can close `stdin`, `stdout` or `stderr`.
+
+The script should start a worker as the last command. For the `pipes`, scripts should not contain programs, which can
+close `stdin`, `stdout` or `stderr`.
- โœ๏ธ Nats jobs driver support - [PR](https://github.com/spiral/roadrunner-plugins/pull/68).
+
```yaml
nats:
addr: "demo.nats.io"
@@ -194,12 +214,14 @@ jobs:
consume: [ "test-1" ]
```
-- Driver uses NATS JetStream API and not compatible with non-js API.
+
+- Driver uses NATS JetStream API and is not compatible with non-js API.
-- โœ๏ธ Response API for the NATS, RabbitMQ, SQS and Beanstalk drivers. This means, that you'll be able to respond to a specified in the response queue.
- Limitations:
- - To send a response to the queue maintained by the RR, you should send it as a `Job` type. There are no limitations for the responses into the other queues (tubes, subjects).
+- โœ๏ธ Response API for the NATS, RabbitMQ, SQS and Beanstalk drivers. This means, that you'll be able to respond to a
+ specified in the response queue. Limitations:
+ - To send a response to the queue maintained by the RR, you should send it as a `Job` type. There are no limitations
+ for the responses into the other queues (tubes, subjects).
- Driver uses the same endpoint (address) to send the response as specified in the configuration.
## ๐Ÿฉน Fixes:
@@ -215,45 +237,53 @@ jobs:
- ๐Ÿ“ฆ roadrunner `v2.5.0`
- ๐Ÿ“ฆ roadrunner-plugins `v2.5.0`
- ๐Ÿ“ฆ roadrunner-temporal `v1.0.10`
-- ๐Ÿ“ฆ endure `v1.0.5`
+- ๐Ÿ“ฆ endure `v1.0.6`
- ๐Ÿ“ฆ goridge `v3.2.3`
## v2.4.1 (13.09.2021)
## ๐Ÿฉน Fixes:
-- ๐Ÿ› Fix: bug with not-idempotent call to the `attributes.Init`.
-- ๐Ÿ› Fix: memory jobs driver behavior. Now memory driver starts consuming automatically if the user consumes the pipeline in the configuration.
+- ๐Ÿ› Fix: bug with not-idempotent call to the `attributes.Init`.
+- ๐Ÿ› Fix: memory jobs driver behavior. Now memory driver starts consuming automatically if the user consumes the
+ pipeline in the configuration.
## v2.4.0 (02.09.2021)
## ๐Ÿ’” Internal BC:
-- ๐Ÿ”จ Pool, worker interfaces: payload now passed and returned by the pointer.
+- ๐Ÿ”จ Pool, worker interfaces: payload now passed and returned by the pointer.
## ๐Ÿ‘€ New:
-- โœ๏ธ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime. Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `memory` and local queue powered by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726)
-- โœ๏ธ Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port`, `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2)
-- โœ๏ธ Support for the Docker images via GitHub packages.
-- โœ๏ธ Go 1.17 support for the all spiral packages.
+- โœ๏ธ Long-awaited, reworked `Jobs` plugin with pluggable drivers. Now you can allocate/destroy pipelines in the runtime.
+ Drivers included in the initial release: `RabbitMQ (0-9-1)`, `SQS v2`, `beanstalk`, `memory` and local queue powered
+ by the `boltdb`. [PR](https://github.com/spiral/roadrunner/pull/726)
+- โœ๏ธ Support for the IPv6 (`tcp|http(s)|empty [::]:port`, `tcp|http(s)|empty [::1]:port`
+ , `tcp|http(s)|empty :// [0:0:0:0:0:0:0:1]:port`) for RPC, HTTP and other
+ plugins. [RFC](https://datatracker.ietf.org/doc/html/rfc2732#section-2)
+- โœ๏ธ Support for the Docker images via GitHub packages.
+- โœ๏ธ Go 1.17 support for the all spiral packages.
## ๐Ÿฉน Fixes:
-- ๐Ÿ› Fix: fixed bug with goroutines waiting on the internal worker's container channel, [issue](https://github.com/spiral/roadrunner/issues/750).
-- ๐Ÿ› Fix: RR become unresponsive when new workers failed to re-allocate, [issue](https://github.com/spiral/roadrunner/issues/772).
-- ๐Ÿ› Fix: add `debug` pool config key to the `.rr.yaml` configuration [reference](https://github.com/spiral/roadrunner-binary/issues/79).
+- ๐Ÿ› Fix: fixed bug with goroutines waiting on the internal worker's container
+ channel, [issue](https://github.com/spiral/roadrunner/issues/750).
+- ๐Ÿ› Fix: RR become unresponsive when new workers failed to
+ re-allocate, [issue](https://github.com/spiral/roadrunner/issues/772).
+- ๐Ÿ› Fix: add `debug` pool config key to the `.rr.yaml`
+ configuration [reference](https://github.com/spiral/roadrunner-binary/issues/79).
## ๐Ÿ“ฆ Packages:
-- ๐Ÿ“ฆ Update goridge to `v3.2.1`
-- ๐Ÿ“ฆ Update temporal to `v1.0.9`
-- ๐Ÿ“ฆ Update endure to `v1.0.4`
+- ๐Ÿ“ฆ Update goridge to `v3.2.1`
+- ๐Ÿ“ฆ Update temporal to `v1.0.9`
+- ๐Ÿ“ฆ Update endure to `v1.0.4`
## ๐Ÿ“ˆ Summary:
-- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29?closed=1)
-- RR-Binary Milestone [2.4.0](https://github.com/spiral/roadrunner-binary/milestone/10?closed=1)
+- RR Milestone [2.4.0](https://github.com/spiral/roadrunner/milestone/29?closed=1)
+- RR-Binary Milestone [2.4.0](https://github.com/spiral/roadrunner-binary/milestone/10?closed=1)
---
@@ -261,13 +291,13 @@ jobs:
## ๐Ÿฉน Fixes:
-- ๐Ÿ› Fix: Do not call the container's Stop method after the container stopped by an error.
-- ๐Ÿ› Fix: Bug with ttl incorrectly handled by the worker [PR](https://github.com/spiral/roadrunner/pull/749)
-- ๐Ÿ› Fix: Add `RR_BROADCAST_PATH` to the `websockets` plugin [PR](https://github.com/spiral/roadrunner/pull/749)
+- ๐Ÿ› Fix: Do not call the container's Stop method after the container stopped by an error.
+- ๐Ÿ› Fix: Bug with ttl incorrectly handled by the worker [PR](https://github.com/spiral/roadrunner/pull/749)
+- ๐Ÿ› Fix: Add `RR_BROADCAST_PATH` to the `websockets` plugin [PR](https://github.com/spiral/roadrunner/pull/749)
## ๐Ÿ“ˆ Summary:
-- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/31?closed=1)
+- RR Milestone [2.3.2](https://github.com/spiral/roadrunner/milestone/31?closed=1)
---
@@ -275,32 +305,32 @@ jobs:
## ๐Ÿ‘€ New:
-- โœ๏ธ Rework `broadcast` plugin. Add architecture diagrams to the `doc`
- folder. [PR](https://github.com/spiral/roadrunner/pull/732)
-- โœ๏ธ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736)
+- โœ๏ธ Rework `broadcast` plugin. Add architecture diagrams to the `doc`
+ folder. [PR](https://github.com/spiral/roadrunner/pull/732)
+- โœ๏ธ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736)
## ๐Ÿฉน Fixes:
-- ๐Ÿ› Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit
- reached [PR](https://github.com/spiral/roadrunner/pull/738)
-- ๐Ÿ› Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next
- request [PR](https://github.com/spiral/roadrunner/pull/738)
-- ๐Ÿ› Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717)
- , [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719)
-- ๐Ÿ› Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720)
-- ๐Ÿ› Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128)
-- ๐Ÿ› Fix: Bug, incorrect request `origin` check [Bug](https://github.com/spiral/roadrunner/issues/727)
+- ๐Ÿ› Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit
+ reached [PR](https://github.com/spiral/roadrunner/pull/738)
+- ๐Ÿ› Fix: Bug with healthcheck endpoint when workers were marked as invalid and stay is that state until next
+ request [PR](https://github.com/spiral/roadrunner/pull/738)
+- ๐Ÿ› Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717)
+ , [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719)
+- ๐Ÿ› Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720)
+- ๐Ÿ› Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128)
+- ๐Ÿ› Fix: Bug, incorrect request `origin` check [Bug](https://github.com/spiral/roadrunner/issues/727)
## ๐Ÿ“ฆ Packages:
-- ๐Ÿ“ฆ Update goridge to `v3.1.4`
-- ๐Ÿ“ฆ Update temporal to `v1.0.8`
+- ๐Ÿ“ฆ Update goridge to `v3.1.4`
+- ๐Ÿ“ฆ Update temporal to `v1.0.8`
## ๐Ÿ“ˆ Summary:
-- RR Milestone [2.3.1](https://github.com/spiral/roadrunner/milestone/30?closed=1)
-- Temporal Milestone [1.0.8](https://github.com/temporalio/roadrunner-temporal/milestone/11?closed=1)
-- Goridge Milestone [3.1.4](https://github.com/spiral/goridge/milestone/11?closed=1)
+- RR Milestone [2.3.1](https://github.com/spiral/roadrunner/milestone/30?closed=1)
+- Temporal Milestone [1.0.8](https://github.com/temporalio/roadrunner-temporal/milestone/11?closed=1)
+- Goridge Milestone [3.1.4](https://github.com/spiral/goridge/milestone/11?closed=1)
---
@@ -308,36 +338,36 @@ jobs:
## ๐Ÿ‘€ New:
-- โœ๏ธ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of
- thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus
- on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513)
-- โœ๏ธ Protobuf binary messages for the `websockets` and `kv` RPC calls under the
- hood. [Issue](https://github.com/spiral/roadrunner/issues/711)
-- โœ๏ธ Json-schemas for the config file v1.0 (it also registered
- in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614))
-- โœ๏ธ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead)
-- โœ๏ธ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error
- code. [Issue](https://github.com/spiral/roadrunner/issues/659)
-- โœ๏ธ Expose HTTP plugin metrics (workers memory, requests count, requests duration)
- . [Issue](https://github.com/spiral/roadrunner/issues/489)
-- โœ๏ธ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh`
- scripts. [Issue](https://github.com/spiral/roadrunner/issues/658)
-- โœ๏ธ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation)
- , [Issue](https://github.com/spiral/roadrunner/issues/545)
+- โœ๏ธ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of
+ thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus
+ on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513)
+- โœ๏ธ Protobuf binary messages for the `websockets` and `kv` RPC calls under the
+ hood. [Issue](https://github.com/spiral/roadrunner/issues/711)
+- โœ๏ธ Json-schemas for the config file v1.0 (it also registered
+ in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614))
+- โœ๏ธ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead)
+- โœ๏ธ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error
+ code. [Issue](https://github.com/spiral/roadrunner/issues/659)
+- โœ๏ธ Expose HTTP plugin metrics (workers memory, requests count, requests duration)
+ . [Issue](https://github.com/spiral/roadrunner/issues/489)
+- โœ๏ธ Scan `server.command` and find errors related to the wrong path to a `PHP` file, or `.ph`, `.sh`
+ scripts. [Issue](https://github.com/spiral/roadrunner/issues/658)
+- โœ๏ธ Support file logger with log rotation [Wiki](https://en.wikipedia.org/wiki/Log_rotation)
+ , [Issue](https://github.com/spiral/roadrunner/issues/545)
## ๐Ÿฉน Fixes:
-- ๐Ÿ› Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686)
-- ๐Ÿ› Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in
- logs: [Bug](https://github.com/spiral/roadrunner/issues/659)
-- ๐Ÿ› Fix: Error message will be properly shown in the log in case of `SoftJob`
- error: [Bug](https://github.com/spiral/roadrunner/issues/691)
-- ๐Ÿ› Fix: Wrong applied middlewares for the `fcgi` server leads to the
- NPE: [Bug](https://github.com/spiral/roadrunner/issues/701)
+- ๐Ÿ› Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686)
+- ๐Ÿ› Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in
+ logs: [Bug](https://github.com/spiral/roadrunner/issues/659)
+- ๐Ÿ› Fix: Error message will be properly shown in the log in case of `SoftJob`
+ error: [Bug](https://github.com/spiral/roadrunner/issues/691)
+- ๐Ÿ› Fix: Wrong applied middlewares for the `fcgi` server leads to the
+ NPE: [Bug](https://github.com/spiral/roadrunner/issues/701)
## ๐Ÿ“ฆ Packages:
-- ๐Ÿ“ฆ Update goridge to `v3.1.0`
+- ๐Ÿ“ฆ Update goridge to `v3.1.0`
---
@@ -345,9 +375,9 @@ jobs:
## ๐Ÿฉน Fixes:
-- ๐Ÿ› Fix: revert static plugin. It stays as a separate plugin on the main route (`/`) and supports all the previously
- announced features.
-- ๐Ÿ› Fix: remove `build` and other old targets from the Makefile.
+- ๐Ÿ› Fix: revert static plugin. It stays as a separate plugin on the main route (`/`) and supports all the previously
+ announced features.
+- ๐Ÿ› Fix: remove `build` and other old targets from the Makefile.
---
@@ -355,21 +385,21 @@ jobs:
## ๐Ÿ‘€ New:
-- โœ๏ธ Reworked `static` plugin. Now, it does not affect the performance of the main route and persist on the separate
- file server (within the `http` plugin). Looong awaited feature: `Etag` (+ weak Etags) as well with the `If-Mach`
- , `If-None-Match`, `If-Range`, `Last-Modified`
- and `If-Modified-Since` tags supported. Static plugin has a bunch of new options such as: `allow`, `calculate_etag`
- , `weak` and `pattern`.
+- โœ๏ธ Reworked `static` plugin. Now, it does not affect the performance of the main route and persist on the separate
+ file server (within the `http` plugin). Looong awaited feature: `Etag` (+ weak Etags) as well with the `If-Mach`
+ , `If-None-Match`, `If-Range`, `Last-Modified`
+ and `If-Modified-Since` tags supported. Static plugin has a bunch of new options such as: `allow`, `calculate_etag`
+ , `weak` and `pattern`.
- ### Option `always` was deleted from the plugin.
+ ### Option `always` was deleted from the plugin.
-- โœ๏ธ Update `informer.List` implementation. Now it returns a list with the all available plugins in the runtime.
+- โœ๏ธ Update `informer.List` implementation. Now it returns a list with the all available plugins in the runtime.
## ๐Ÿฉน Fixes:
-- ๐Ÿ› Fix: issue with wrong ordered middlewares (reverse). Now the order is correct.
-- ๐Ÿ› Fix: issue when RR fails if a user sets `debug` mode with the `exec_ttl` supervisor option.
-- ๐Ÿ› Fix: uniform log levels. Use everywhere the same levels (warn, error, debug, info, panic).
+- ๐Ÿ› Fix: issue with wrong ordered middlewares (reverse). Now the order is correct.
+- ๐Ÿ› Fix: issue when RR fails if a user sets `debug` mode with the `exec_ttl` supervisor option.
+- ๐Ÿ› Fix: uniform log levels. Use everywhere the same levels (warn, error, debug, info, panic).
---
@@ -377,102 +407,102 @@ jobs:
## ๐Ÿฉน Fixes:
-- ๐Ÿ› Fix: issue with endure provided wrong logger interface implementation.
+- ๐Ÿ› Fix: issue with endure provided wrong logger interface implementation.
## v2.1.0 (27.04.2021)
## ๐Ÿ‘€ New:
-- โœ๏ธ New `service` plugin. Docs: [link](https://roadrunner.dev/docs/beep-beep-service)
-- โœ๏ธ Stabilize `kv` plugin with `boltdb`, `in-memory`, `memcached` and `redis` drivers.
+- โœ๏ธ New `service` plugin. Docs: [link](https://roadrunner.dev/docs/beep-beep-service)
+- โœ๏ธ Stabilize `kv` plugin with `boltdb`, `in-memory`, `memcached` and `redis` drivers.
## ๐Ÿฉน Fixes:
-- ๐Ÿ› Fix: Logger didn't provide an anonymous log instance to a plugins w/o `Named` interface implemented.
-- ๐Ÿ› Fix: http handler was without log listener after `rr reset`.
+- ๐Ÿ› Fix: Logger didn't provide an anonymous log instance to a plugins w/o `Named` interface implemented.
+- ๐Ÿ› Fix: http handler was without log listener after `rr reset`.
## v2.0.4 (06.04.2021)
## ๐Ÿ‘€ New:
-- โœ๏ธ Add support for `linux/arm64` platform for docker image (thanks @tarampampam).
-- โœ๏ธ Add dotenv file support (`.env` in working directory by default; file location can be changed using CLI
- flag `--dotenv` or `DOTENV_PATH` environment variable) (thanks @tarampampam).
-- ๐Ÿ“œ Add a new `raw` mode for the `logger` plugin to keep the stderr log message of the worker unmodified (logger
- severity level should be at least `INFO`).
-- ๐Ÿ†• Add Readiness probe check. The `status` plugin provides `/ready` endpoint which return the `204` HTTP code if there
- are no workers in the `Ready` state and `200 OK` status if there are at least 1 worker in the `Ready` state.
+- โœ๏ธ Add support for `linux/arm64` platform for docker image (thanks @tarampampam).
+- โœ๏ธ Add dotenv file support (`.env` in working directory by default; file location can be changed using CLI
+ flag `--dotenv` or `DOTENV_PATH` environment variable) (thanks @tarampampam).
+- ๐Ÿ“œ Add a new `raw` mode for the `logger` plugin to keep the stderr log message of the worker unmodified (logger
+ severity level should be at least `INFO`).
+- ๐Ÿ†• Add Readiness probe check. The `status` plugin provides `/ready` endpoint which return the `204` HTTP code if there
+ are no workers in the `Ready` state and `200 OK` status if there are at least 1 worker in the `Ready` state.
## ๐Ÿฉน Fixes:
-- ๐Ÿ› Fix: bug with the temporal worker which does not follow general graceful shutdown period.
+- ๐Ÿ› Fix: bug with the temporal worker which does not follow general graceful shutdown period.
## v2.0.3 (29.03.2021)
## ๐Ÿฉน Fixes:
-- ๐Ÿ› Fix: slow last response when reached `max_jobs` limit.
+- ๐Ÿ› Fix: slow last response when reached `max_jobs` limit.
## v2.0.2 (06.04.2021)
-- ๐Ÿ› Fix: Bug with required Root CA certificate for the SSL, now it's optional.
-- ๐Ÿ› Fix: Bug with incorrectly consuming metrics collector from the RPC calls (thanks @dstrop).
-- ๐Ÿ†• New: HTTP/FCGI/HTTPS internal logs instead of going to the raw stdout will be displayed in the RR logger at
- the `Info` log level.
-- โšก New: Builds for the Mac with the M1 processor (arm64).
-- ๐Ÿ‘ท Rework ServeHTTP handler logic. Use http.Error instead of writing code directly to the response writer. Other small
- improvements.
+- ๐Ÿ› Fix: Bug with required Root CA certificate for the SSL, now it's optional.
+- ๐Ÿ› Fix: Bug with incorrectly consuming metrics collector from the RPC calls (thanks @dstrop).
+- ๐Ÿ†• New: HTTP/FCGI/HTTPS internal logs instead of going to the raw stdout will be displayed in the RR logger at
+ the `Info` log level.
+- โšก New: Builds for the Mac with the M1 processor (arm64).
+- ๐Ÿ‘ท Rework ServeHTTP handler logic. Use http.Error instead of writing code directly to the response writer. Other small
+ improvements.
## v2.0.1 (09.03.2021)
-- ๐Ÿ› Fix: incorrect PHP command validation
-- ๐Ÿ› Fix: ldflags properly inject RR version
-- โฌ†๏ธ Update: README, links to the go.pkg from v1 to v2
-- ๐Ÿ“ฆ Bump golang version in the Dockerfile and in the `go.mod` to 1.16
-- ๐Ÿ“ฆ Bump Endure container to v1.0.0.
+- ๐Ÿ› Fix: incorrect PHP command validation
+- ๐Ÿ› Fix: ldflags properly inject RR version
+- โฌ†๏ธ Update: README, links to the go.pkg from v1 to v2
+- ๐Ÿ“ฆ Bump golang version in the Dockerfile and in the `go.mod` to 1.16
+- ๐Ÿ“ฆ Bump Endure container to v1.0.0.
## v2.0.0 (02.03.2021)
-- โœ”๏ธ Add a shared server to create PHP worker pools instead of isolated worker pool in each individual plugin.
-- ๐Ÿ†• New plugin system with auto-recovery, easier plugin API.
-- ๐Ÿ“œ New `logger` plugin to configure logging for each plugin individually.
-- ๐Ÿ” Up to 50% performance increase in HTTP workloads.
-- โœ”๏ธ Add **[Temporal Workflow](https://temporal.io)** plugin to run distributed computations on scale.
-- โœ”๏ธ Add `debug` flag to reload PHP worker ahead of a request (emulates PHP-FPM behavior).
-- โŒ Eliminate `limit` service, now each worker pool includes `supervisor` configuration.
-- ๐Ÿ†• New resetter, informer plugins to perform hot reloads and observe loggers in a system.
-- ๐Ÿ’ซ Expose more HTTP plugin configuration options.
-- ๐Ÿ†• Headers, static and gzip services now located in HTTP config.
-- ๐Ÿ†• Ability to configure the middleware sequence.
-- ๐Ÿ’ฃ Faster Goridge protocol (eliminated 50% of syscalls).
-- ๐Ÿ’พ Add support for binary payloads for RPC (`msgpack`).
-- ๐Ÿ†• Server no longer stops when a PHP worker dies (attempts to restart).
-- ๐Ÿ’พ New RR binary server downloader.
-- ๐Ÿ’ฃ Echoing no longer breaks execution (yay!).
-- ๐Ÿ†• Migration to ZapLogger instead of Logrus.
-- ๐Ÿ’ฅ RR can no longer stuck when studding down with broken tasks in a pipeline.
-- ๐Ÿงช More tests, more static analysis.
-- ๐Ÿ’ฅ Create a new foundation for new KV, WebSocket, GRPC and Queue plugins.
+- โœ”๏ธ Add a shared server to create PHP worker pools instead of isolated worker pool in each individual plugin.
+- ๐Ÿ†• New plugin system with auto-recovery, easier plugin API.
+- ๐Ÿ“œ New `logger` plugin to configure logging for each plugin individually.
+- ๐Ÿ” Up to 50% performance increase in HTTP workloads.
+- โœ”๏ธ Add **[Temporal Workflow](https://temporal.io)** plugin to run distributed computations on scale.
+- โœ”๏ธ Add `debug` flag to reload PHP worker ahead of a request (emulates PHP-FPM behavior).
+- โŒ Eliminate `limit` service, now each worker pool includes `supervisor` configuration.
+- ๐Ÿ†• New resetter, informer plugins to perform hot reloads and observe loggers in a system.
+- ๐Ÿ’ซ Expose more HTTP plugin configuration options.
+- ๐Ÿ†• Headers, static and gzip services now located in HTTP config.
+- ๐Ÿ†• Ability to configure the middleware sequence.
+- ๐Ÿ’ฃ Faster Goridge protocol (eliminated 50% of syscalls).
+- ๐Ÿ’พ Add support for binary payloads for RPC (`msgpack`).
+- ๐Ÿ†• Server no longer stops when a PHP worker dies (attempts to restart).
+- ๐Ÿ’พ New RR binary server downloader.
+- ๐Ÿ’ฃ Echoing no longer breaks execution (yay!).
+- ๐Ÿ†• Migration to ZapLogger instead of Logrus.
+- ๐Ÿ’ฅ RR can no longer stuck when studding down with broken tasks in a pipeline.
+- ๐Ÿงช More tests, more static analysis.
+- ๐Ÿ’ฅ Create a new foundation for new KV, WebSocket, GRPC and Queue plugins.
## v2.0.0-RC.4 (20.02.2021)
-- PHP tests use latest signatures (https://github.com/spiral/roadrunner/pull/550).
-- Endure container update to v1.0.0-RC.2 version.
-- Remove unneeded mutex from the `http.Workers` method.
-- Rename `checker` plugin package to `status`, remove `/v1` endpoint prefix (#557).
-- Add static, headers, status, gzip plugins to the `main.go`.
-- Fix workers pool behavior -> idle_ttl, ttl, max_memory are soft errors and exec_ttl is hard error.
+- PHP tests use latest signatures (https://github.com/spiral/roadrunner/pull/550).
+- Endure container update to v1.0.0-RC.2 version.
+- Remove unneeded mutex from the `http.Workers` method.
+- Rename `checker` plugin package to `status`, remove `/v1` endpoint prefix (#557).
+- Add static, headers, status, gzip plugins to the `main.go`.
+- Fix workers pool behavior -> idle_ttl, ttl, max_memory are soft errors and exec_ttl is hard error.
## v2.0.0-RC.3 (17.02.2021)
-- Add support for the overwriting `.rr.yaml` keys with values (ref: https://roadrunner.dev/docs/intro-config)
-- Make logger plugin optional to define in the config. Default values: level -> `debug`, mode -> `development`
-- Add the ability to read env variables from the `.rr.yaml` in the form of: `rpc.listen: {RPC_ADDR}`. Reference:
- ref: https://roadrunner.dev/docs/intro-config (Environment Variables paragraph)
+- Add support for the overwriting `.rr.yaml` keys with values (ref: https://roadrunner.dev/docs/intro-config)
+- Make logger plugin optional to define in the config. Default values: level -> `debug`, mode -> `development`
+- Add the ability to read env variables from the `.rr.yaml` in the form of: `rpc.listen: {RPC_ADDR}`. Reference:
+ ref: https://roadrunner.dev/docs/intro-config (Environment Variables paragraph)
## v2.0.0-RC.2 (11.02.2021)
-- Update RR to version v2.0.0-RC.2
-- Update Temporal plugin to version v2.0.0-RC.1
-- Update Goridge to version v3.0.1
-- Update Endure to version v1.0.0-RC.1
+- Update RR to version v2.0.0-RC.2
+- Update Temporal plugin to version v2.0.0-RC.1
+- Update Goridge to version v3.0.1
+- Update Endure to version v1.0.0-RC.1
diff --git a/Makefile b/Makefile
index 9c800ea4..d2f4ebc6 100644
--- a/Makefile
+++ b/Makefile
@@ -14,6 +14,7 @@ test_coverage:
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.out -covermode=atomic ./bst
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pq.out -covermode=atomic ./priority_queue
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker_stack.out -covermode=atomic ./worker_watcher
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/events.out -covermode=atomic ./events
echo 'mode: atomic' > ./coverage-ci/summary.txt
tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt
@@ -25,3 +26,4 @@ test: ## Run application tests
go test -v -race -tags=debug ./worker_watcher
go test -v -race -tags=debug ./bst
go test -v -race -tags=debug ./priority_queue
+ go test -v -race -tags=debug ./events
diff --git a/events/docs/events.md b/events/docs/events.md
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/events/docs/events.md
diff --git a/events/events.go b/events/events.go
new file mode 100644
index 00000000..b7396653
--- /dev/null
+++ b/events/events.go
@@ -0,0 +1,143 @@
+package events
+
+type EventType uint32
+
+const (
+ // EventUnaryCallOk represents success unary call response
+ EventUnaryCallOk EventType = iota
+
+ // EventUnaryCallErr raised when unary call ended with error
+ EventUnaryCallErr
+
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipePaused when pipeline has been paused.
+ EventPipePaused
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventDriverReady thrown when broken is ready to accept/serve tasks.
+ EventDriverReady
+
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventWorkerProcessExit triggered on process wait exit
+ EventWorkerProcessExit
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
+ EventTTL
+
+ // EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+
+ // EventPoolRestart triggered when pool restart is needed
+ EventPoolRestart
+
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+ // EventWorkerStderr is the worker standard error output
+ EventWorkerStderr
+ // EventWorkerWaitExit is the worker exit event
+ EventWorkerWaitExit
+)
+
+func (et EventType) String() string {
+ switch et {
+ case EventPushOK:
+ return "EventPushOK"
+ case EventPushError:
+ return "EventPushError"
+ case EventJobStart:
+ return "EventJobStart"
+ case EventJobOK:
+ return "EventJobOK"
+ case EventJobError:
+ return "EventJobError"
+ case EventPipeActive:
+ return "EventPipeActive"
+ case EventPipeStopped:
+ return "EventPipeStopped"
+ case EventPipeError:
+ return "EventPipeError"
+ case EventDriverReady:
+ return "EventDriverReady"
+ case EventPipePaused:
+ return "EventPipePaused"
+
+ case EventUnaryCallOk:
+ return "EventUnaryCallOk"
+ case EventUnaryCallErr:
+ return "EventUnaryCallErr"
+
+ case EventWorkerProcessExit:
+ return "EventWorkerProcessExit"
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ case EventPoolRestart:
+ return "EventPoolRestart"
+
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ case EventWorkerStderr:
+ return "EventWorkerStderr"
+ case EventWorkerWaitExit:
+ return "EventWorkerWaitExit"
+
+ default:
+ return "UnknownEventType"
+ }
+}
diff --git a/events/events_test.go b/events/events_test.go
new file mode 100644
index 00000000..e15c55d6
--- /dev/null
+++ b/events/events_test.go
@@ -0,0 +1,94 @@
+package events
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestEvenHandler(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "http.EventJobOK", ch)
+ require.NoError(t, err)
+
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventJobOK", evt.Type().String())
+}
+
+func TestEvenHandler2(t *testing.T) {
+ eh, id := Bus()
+ eh2, id2 := Bus()
+ defer eh.Unsubscribe(id)
+ defer eh2.Unsubscribe(id2)
+
+ ch := make(chan Event, 100)
+ ch2 := make(chan Event, 100)
+ err := eh2.SubscribeP(id2, "http.EventJobOK", ch)
+ require.NoError(t, err)
+
+ err = eh.SubscribeP(id, "http.EventJobOK", ch2)
+ require.NoError(t, err)
+
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
+
+ evt := <-ch2
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventJobOK", evt.Type().String())
+
+ l := eh.Len()
+ require.Equal(t, uint(2), l)
+
+ eh.Unsubscribe(id)
+ time.Sleep(time.Second)
+
+ l = eh.Len()
+ require.Equal(t, uint(1), l)
+
+ eh2.Unsubscribe(id2)
+ time.Sleep(time.Second)
+
+ l = eh.Len()
+ require.Equal(t, uint(0), l)
+}
+
+func TestEvenHandler3(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "EventJobOK", ch)
+ require.Error(t, err)
+}
+
+func TestEvenHandler4(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ err := eh.SubscribeP(id, "EventJobOK", nil)
+ require.Error(t, err)
+}
+
+func TestEvenHandler5(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "http.EventJobOK", ch)
+ require.NoError(t, err)
+
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventJobOK", evt.Type().String())
+}
diff --git a/events/eventsbus.go b/events/eventsbus.go
new file mode 100644
index 00000000..cd0dca71
--- /dev/null
+++ b/events/eventsbus.go
@@ -0,0 +1,170 @@
+package events
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+
+ "github.com/spiral/errors"
+)
+
+type sub struct {
+ pattern string
+ w *wildcard
+ events chan<- Event
+}
+
+type eventsBus struct {
+ sync.RWMutex
+ subscribers sync.Map
+ internalEvCh chan Event
+ stop chan struct{}
+}
+
+func newEventsBus() *eventsBus {
+ return &eventsBus{
+ internalEvCh: make(chan Event, 100),
+ stop: make(chan struct{}),
+ }
+}
+
+/*
+http.* <-
+*/
+
+// SubscribeAll for all RR events
+// returns subscriptionID
+func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error {
+ if ch == nil {
+ return errors.Str("nil channel provided")
+ }
+
+ subIDTr := strings.Trim(subID, " ")
+
+ if subIDTr == "" {
+ return errors.Str("subscriberID can't be empty")
+ }
+
+ return eb.subscribe(subID, "*", ch)
+}
+
+// SubscribeP pattern like "pluginName.EventType"
+func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error {
+ if ch == nil {
+ return errors.Str("nil channel provided")
+ }
+
+ subIDTr := strings.Trim(subID, " ")
+ patternTr := strings.Trim(pattern, " ")
+
+ if subIDTr == "" || patternTr == "" {
+ return errors.Str("subscriberID or pattern can't be empty")
+ }
+
+ return eb.subscribe(subID, pattern, ch)
+}
+
+func (eb *eventsBus) Unsubscribe(subID string) {
+ eb.subscribers.Delete(subID)
+}
+
+func (eb *eventsBus) UnsubscribeP(subID, pattern string) {
+ if sb, ok := eb.subscribers.Load(subID); ok {
+ eb.Lock()
+ defer eb.Unlock()
+
+ sbArr := sb.([]*sub)
+
+ for i := 0; i < len(sbArr); i++ {
+ if sbArr[i].pattern == pattern {
+ sbArr[i] = sbArr[len(sbArr)-1]
+ sbArr = sbArr[:len(sbArr)-1]
+ // replace with new array
+ eb.subscribers.Store(subID, sbArr)
+ return
+ }
+ }
+ }
+}
+
+// Send sends event to the events bus
+func (eb *eventsBus) Send(ev Event) {
+ // do not accept nil events
+ if ev == nil {
+ return
+ }
+
+ eb.internalEvCh <- ev
+}
+
+func (eb *eventsBus) Len() uint {
+ var ln uint
+
+ eb.subscribers.Range(func(key, value interface{}) bool {
+ ln++
+ return true
+ })
+
+ return ln
+}
+
+func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error {
+ eb.Lock()
+ defer eb.Unlock()
+ w, err := newWildcard(pattern)
+ if err != nil {
+ return err
+ }
+
+ if sb, ok := eb.subscribers.Load(subID); ok {
+ // at this point we are confident that sb is a []*sub type
+ subArr := sb.([]*sub)
+ subArr = append(subArr, &sub{
+ pattern: pattern,
+ w: w,
+ events: ch,
+ })
+
+ eb.subscribers.Store(subID, subArr)
+
+ return nil
+ }
+
+ subArr := make([]*sub, 0, 5)
+ subArr = append(subArr, &sub{
+ pattern: pattern,
+ w: w,
+ events: ch,
+ })
+
+ eb.subscribers.Store(subID, subArr)
+
+ return nil
+}
+
+func (eb *eventsBus) handleEvents() {
+ for { //nolint:gosimple
+ select {
+ case ev := <-eb.internalEvCh:
+ // http.WorkerError for example
+ wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String())
+
+ eb.subscribers.Range(func(key, value interface{}) bool {
+ vsub := value.([]*sub)
+
+ for i := 0; i < len(vsub); i++ {
+ if vsub[i].w.match(wc) {
+ select {
+ case vsub[i].events <- ev:
+ return true
+ default:
+ return true
+ }
+ }
+ }
+
+ return true
+ })
+ }
+ }
+}
diff --git a/events/general.go b/events/general.go
deleted file mode 100755
index 5cf13e10..00000000
--- a/events/general.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package events
-
-import (
- "sync"
-)
-
-const UnknownEventType string = "Unknown event type"
-
-// HandlerImpl helps to broadcast events to multiple listeners.
-type HandlerImpl struct {
- listeners []Listener
- sync.RWMutex // all receivers should be pointers
-}
-
-func NewEventsHandler() Handler {
- return &HandlerImpl{listeners: make([]Listener, 0, 2)}
-}
-
-// NumListeners returns number of event listeners.
-func (eb *HandlerImpl) NumListeners() int {
- eb.Lock()
- defer eb.Unlock()
- return len(eb.listeners)
-}
-
-// AddListener registers new event listener.
-func (eb *HandlerImpl) AddListener(listener Listener) {
- eb.Lock()
- defer eb.Unlock()
- eb.listeners = append(eb.listeners, listener)
-}
-
-// Push broadcast events across all event listeners.
-func (eb *HandlerImpl) Push(e interface{}) {
- // ReadLock here because we are not changing listeners
- eb.RLock()
- defer eb.RUnlock()
- for k := range eb.listeners {
- eb.listeners[k](e)
- }
-}
diff --git a/events/grpc_event.go b/events/grpc_event.go
deleted file mode 100644
index 31ff4957..00000000
--- a/events/grpc_event.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package events
-
-import (
- "time"
-
- "google.golang.org/grpc"
-)
-
-const (
- // EventUnaryCallOk represents success unary call response
- EventUnaryCallOk G = iota + 13000
-
- // EventUnaryCallErr raised when unary call ended with error
- EventUnaryCallErr
-)
-
-type G int64
-
-func (ev G) String() string {
- switch ev {
- case EventUnaryCallOk:
- return "EventUnaryCallOk"
- case EventUnaryCallErr:
- return "EventUnaryCallErr"
- }
- return UnknownEventType
-}
-
-// JobEvent represent job event.
-type GRPCEvent struct {
- Event G
- // Info contains unary call info.
- Info *grpc.UnaryServerInfo
- // Error associated with event.
- Error error
- // event timings
- Start time.Time
- Elapsed time.Duration
-}
diff --git a/events/init.go b/events/init.go
new file mode 100644
index 00000000..25e92fc5
--- /dev/null
+++ b/events/init.go
@@ -0,0 +1,20 @@
+package events
+
+import (
+ "sync"
+
+ "github.com/google/uuid"
+)
+
+var evBus *eventsBus
+var onInit = &sync.Once{}
+
+func Bus() (*eventsBus, string) {
+ onInit.Do(func() {
+ evBus = newEventsBus()
+ go evBus.handleEvents()
+ })
+
+ // return events bus with subscriberID
+ return evBus, uuid.NewString()
+}
diff --git a/events/interface.go b/events/interface.go
deleted file mode 100644
index 7d57e4d0..00000000
--- a/events/interface.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package events
-
-// Handler interface
-type Handler interface {
- // NumListeners return number of active listeners
- NumListeners() int
- // AddListener adds lister to the publisher
- AddListener(listener Listener)
- // Push pushes event to the listeners
- Push(e interface{})
-}
-
-// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service.
-type Listener func(event interface{})
diff --git a/events/jobs_events.go b/events/jobs_events.go
deleted file mode 100644
index f65ede67..00000000
--- a/events/jobs_events.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package events
-
-import (
- "time"
-)
-
-const (
- // EventPushOK thrown when new job has been added. JobEvent is passed as context.
- EventPushOK J = iota + 12000
-
- // EventPushError caused when job can not be registered.
- EventPushError
-
- // EventJobStart thrown when new job received.
- EventJobStart
-
- // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
- EventJobOK
-
- // EventJobError thrown on all job related errors. See JobError as context.
- EventJobError
-
- // EventPipeActive when pipeline has started.
- EventPipeActive
-
- // EventPipeStopped when pipeline has been stopped.
- EventPipeStopped
-
- // EventPipePaused when pipeline has been paused.
- EventPipePaused
-
- // EventPipeError when pipeline specific error happen.
- EventPipeError
-
- // EventDriverReady thrown when broken is ready to accept/serve tasks.
- EventDriverReady
-)
-
-type J int64
-
-func (ev J) String() string {
- switch ev {
- case EventPushOK:
- return "EventPushOK"
- case EventPushError:
- return "EventPushError"
- case EventJobStart:
- return "EventJobStart"
- case EventJobOK:
- return "EventJobOK"
- case EventJobError:
- return "EventJobError"
- case EventPipeActive:
- return "EventPipeActive"
- case EventPipeStopped:
- return "EventPipeStopped"
- case EventPipeError:
- return "EventPipeError"
- case EventDriverReady:
- return "EventDriverReady"
- case EventPipePaused:
- return "EventPipePaused"
- }
- return UnknownEventType
-}
-
-// JobEvent represent job event.
-type JobEvent struct {
- Event J
- // String is job id.
- ID string
- // Pipeline name
- Pipeline string
- // Associated driver name (amqp, ephemeral, etc)
- Driver string
- // Error for the jobs/pipes errors
- Error error
- // event timings
- Start time.Time
- Elapsed time.Duration
-}
diff --git a/events/pool_events.go b/events/pool_events.go
deleted file mode 100644
index eb28df6a..00000000
--- a/events/pool_events.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package events
-
-const (
- // EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct P = iota + 10000
-
- // EventWorkerDestruct thrown after worker destruction.
- EventWorkerDestruct
-
- // EventSupervisorError triggered when supervisor can not complete work.
- EventSupervisorError
-
- // EventWorkerProcessExit triggered on process wait exit
- EventWorkerProcessExit
-
- // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
- EventNoFreeWorkers
-
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory
-
- // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
- EventTTL
-
- // EventIdleTTL triggered when worker spends too much time at rest.
- EventIdleTTL
-
- // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
- EventExecTTL
-
- // EventPoolRestart triggered when pool restart is needed
- EventPoolRestart
-)
-
-type P int64
-
-func (ev P) String() string {
- switch ev {
- case EventWorkerProcessExit:
- return "EventWorkerProcessExit"
- case EventWorkerConstruct:
- return "EventWorkerConstruct"
- case EventWorkerDestruct:
- return "EventWorkerDestruct"
- case EventSupervisorError:
- return "EventSupervisorError"
- case EventNoFreeWorkers:
- return "EventNoFreeWorkers"
- case EventMaxMemory:
- return "EventMaxMemory"
- case EventTTL:
- return "EventTTL"
- case EventIdleTTL:
- return "EventIdleTTL"
- case EventExecTTL:
- return "EventExecTTL"
- case EventPoolRestart:
- return "EventPoolRestart"
- }
- return UnknownEventType
-}
-
-// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
-type PoolEvent struct {
- // Event type, see below.
- Event P
-
- // Payload depends on event type, typically it's worker or error.
- Payload interface{}
- Error error
-}
diff --git a/events/types.go b/events/types.go
new file mode 100644
index 00000000..65a76d15
--- /dev/null
+++ b/events/types.go
@@ -0,0 +1,46 @@
+package events
+
+type EventBus interface {
+ SubscribeAll(subID string, ch chan<- Event) error
+ SubscribeP(subID string, pattern string, ch chan<- Event) error
+ Unsubscribe(subID string)
+ UnsubscribeP(subID, pattern string)
+ Len() uint
+ Send(ev Event)
+}
+
+type Event interface {
+ Plugin() string
+ Type() EventType
+ Message() string
+}
+
+type RREvent struct {
+ // event typ
+ typ EventType
+ // plugin
+ plugin string
+ // message
+ message string
+}
+
+// NewEvent initializes new event
+func NewEvent(t EventType, plugin string, msg string) *RREvent {
+ return &RREvent{
+ typ: t,
+ plugin: plugin,
+ message: msg,
+ }
+}
+
+func (r *RREvent) Type() EventType {
+ return r.typ
+}
+
+func (r *RREvent) Message() string {
+ return r.message
+}
+
+func (r *RREvent) Plugin() string {
+ return r.plugin
+}
diff --git a/events/wildcard.go b/events/wildcard.go
new file mode 100644
index 00000000..b4c28ae1
--- /dev/null
+++ b/events/wildcard.go
@@ -0,0 +1,43 @@
+package events
+
+import (
+ "strings"
+
+ "github.com/spiral/errors"
+)
+
+type wildcard struct {
+ prefix string
+ suffix string
+}
+
+func newWildcard(pattern string) (*wildcard, error) {
+ // Normalize
+ origin := strings.ToLower(pattern)
+ i := strings.IndexByte(origin, '*')
+
+ /*
+ http.*
+ *
+ *.WorkerError
+ */
+ if i == -1 {
+ dotI := strings.IndexByte(pattern, '.')
+
+ if dotI == -1 {
+ // http.SuperEvent
+ return nil, errors.Str("wrong wildcard, no * or . Usage: http.Event or *.Event or http.*")
+ }
+
+ return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil
+ }
+
+ // pref: http.
+ // suff: *
+ return &wildcard{origin[0:i], origin[i+1:]}, nil
+}
+
+func (w wildcard) match(s string) bool {
+ s = strings.ToLower(s)
+ return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix)
+}
diff --git a/events/wildcard_test.go b/events/wildcard_test.go
new file mode 100644
index 00000000..230ef673
--- /dev/null
+++ b/events/wildcard_test.go
@@ -0,0 +1,48 @@
+package events
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestWildcard(t *testing.T) {
+ w, err := newWildcard("http.*")
+ assert.NoError(t, err)
+ assert.True(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.True(t, w.match("http.****"))
+
+ // *.* -> *
+ w, err = newWildcard("*")
+ assert.NoError(t, err)
+ assert.True(t, w.match("http.SuperEvent"))
+ assert.True(t, w.match("https.SuperEvent"))
+ assert.True(t, w.match(""))
+ assert.True(t, w.match("*"))
+ assert.True(t, w.match("****"))
+ assert.True(t, w.match("http.****"))
+
+ w, err = newWildcard("*.WorkerError")
+ assert.NoError(t, err)
+ assert.False(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.False(t, w.match("http.****"))
+ assert.True(t, w.match("http.WorkerError"))
+
+ w, err = newWildcard("http.WorkerError")
+ assert.NoError(t, err)
+ assert.False(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.False(t, w.match("http.****"))
+ assert.True(t, w.match("http.WorkerError"))
+}
diff --git a/events/worker_events.go b/events/worker_events.go
deleted file mode 100644
index 6b80df61..00000000
--- a/events/worker_events.go
+++ /dev/null
@@ -1,40 +0,0 @@
-package events
-
-const (
- // EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError W = iota + 11000
- // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
- EventWorkerLog
- // EventWorkerStderr is the worker standard error output
- EventWorkerStderr
- // EventWorkerWaitExit is the worker exit event
- EventWorkerWaitExit
-)
-
-type W int64
-
-func (ev W) String() string {
- switch ev {
- case EventWorkerError:
- return "EventWorkerError"
- case EventWorkerLog:
- return "EventWorkerLog"
- case EventWorkerStderr:
- return "EventWorkerStderr"
- case EventWorkerWaitExit:
- return "EventWorkerWaitExit"
- }
- return UnknownEventType
-}
-
-// WorkerEvent wraps worker events.
-type WorkerEvent struct {
- // Event id, see below.
- Event W
-
- // Worker triggered the event.
- Worker interface{}
-
- // Event specific payload.
- Payload interface{}
-}
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 91bd1c2c..11112e72 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -2,6 +2,7 @@ package pool
import (
"context"
+ "fmt"
"os/exec"
"time"
@@ -14,8 +15,12 @@ import (
workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher"
)
-// StopRequest can be sent by worker to indicate that restart is required.
-const StopRequest = "{\"stop\":true}"
+const (
+ // StopRequest can be sent by worker to indicate that restart is required.
+ StopRequest = `{"stop":true}`
+ // pluginName ...
+ pluginName = "pool"
+)
// ErrorEncoder encode error or make a decision based on the error type
type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error)
@@ -34,11 +39,8 @@ type StaticPool struct {
// creates and connects to stack
factory transport.Factory
- // distributes the events
- events events.Handler
-
- // saved list of event listeners
- listeners []events.Listener
+ events events.EventBus
+ eventsID string
// manages worker states and TTLs
ww Watcher
@@ -62,11 +64,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
cfg.MaxJobs = 1
}
+ eb, id := events.Bus()
p := &StaticPool{
- cfg: cfg,
- cmd: cmd,
- factory: factory,
- events: events.NewEventsHandler(),
+ cfg: cfg,
+ cmd: cmd,
+ factory: factory,
+ events: eb,
+ eventsID: id,
}
// add pool options
@@ -77,7 +81,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
// set up workers watcher
- p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout)
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
@@ -95,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
- sp := supervisorWrapper(p, p.events, p.cfg.Supervisor)
+ sp := supervisorWrapper(p, eb, p.cfg.Supervisor)
// start watcher timer
sp.Start()
return sp, nil
@@ -104,20 +108,6 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
return p, nil
}
-func AddListeners(listeners ...events.Listener) Options {
- return func(p *StaticPool) {
- p.listeners = listeners
- for i := 0; i < len(listeners); i++ {
- p.addListener(listeners[i])
- }
- }
-}
-
-// AddListener connects event listener to the pool.
-func (sp *StaticPool) addListener(listener events.Listener) {
- sp.events.AddListener(listener)
-}
-
// GetConfig returns associated pool configuration. Immutable.
func (sp *StaticPool) GetConfig() interface{} {
return sp.cfg
@@ -205,7 +195,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid())))
}
}
@@ -227,7 +217,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Error: errors.E(op, err)})
+ sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("error: %s", err)))
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -238,6 +228,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
// Destroy all underlying stack (but let them complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.events.Unsubscribe(sp.eventsID)
sp.ww.Destroy(ctx)
}
@@ -246,12 +237,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err})
+ sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("error: %s", err)))
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
@@ -272,7 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
// kill the worker instead of sending net packet to it
_ = w.Kill()
@@ -280,7 +271,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return nil, err
default:
w.State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
@@ -296,7 +287,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
return func() (worker.SyncWorker, error) {
ctxT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
- w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...)
+ w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd())
if err != nil {
return nil, err
}
@@ -304,10 +295,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// wrap sync worker
sw := worker.From(w)
- sp.events.Push(events.PoolEvent{
- Event: events.EventWorkerConstruct,
- Payload: sw,
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("pid: %d", sw.Pid())))
return sw, nil
}
}
@@ -329,7 +317,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
sw.State().Set(worker.StateDestroyed)
err = sw.Kill()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid())))
return nil, err
}
@@ -346,7 +334,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
// redirect call to the worker with TTL
r, err := sw.ExecWithTTL(ctx, p)
if stopErr := sw.Stop(); stopErr != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid())))
}
return r, err
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index 9861f0d8..717d301e 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -18,6 +18,7 @@ import (
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
var cfg = &Config{
@@ -167,26 +168,18 @@ func Test_StaticPool_JobError(t *testing.T) {
func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
- block := make(chan struct{}, 10)
-
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- if wev.Event == events.EventWorkerStderr {
- e := string(wev.Payload.([]byte))
- if strings.ContainsAny(e, "undefined_function()") {
- block <- struct{}{}
- return
- }
- }
- }
- }
+
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") },
pipe.NewPipeFactory(),
cfg,
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -196,22 +189,23 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-block
+ event := <-ch
+ if !strings.Contains(event.Message(), "undefined_function()") {
+ t.Fatal("event should contain undefiled function()")
+ }
p.Destroy(ctx)
}
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
+
// Run pool events
- ev := make(chan struct{}, 1)
- listener := func(event interface{}) {
- if pe, ok := event.(events.PoolEvent); ok {
- if pe.Event == events.EventWorkerConstruct {
- ev <- struct{}{}
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch)
+ require.NoError(t, err)
var cfg2 = &Config{
NumWorkers: 1,
@@ -224,7 +218,6 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
cfg2,
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -242,7 +235,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Equal(t, 1, len(p.Workers()))
// first creation
- <-ev
+ <-ch
// killing random worker and expecting pool to replace it
err = p.Workers()[0].Kill()
if err != nil {
@@ -250,7 +243,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
// re-creation
- <-ev
+ <-ch
list := p.Workers()
for _, w := range list {
@@ -496,15 +489,12 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
func Test_StaticPool_NoFreeWorkers(t *testing.T) {
ctx := context.Background()
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.PoolEvent); ok {
- if ev.Event == events.EventNoFreeWorkers {
- block <- struct{}{}
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch)
+ require.NoError(t, err)
p, err := Initialize(
ctx,
@@ -518,7 +508,6 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
DestroyTimeout: time.Second,
Supervisor: nil,
},
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -532,7 +521,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-block
+ <-ch
p.Destroy(ctx)
}
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index 99af168c..1a94f6a0 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -2,6 +2,7 @@ package pool
import (
"context"
+ "fmt"
"sync"
"time"
@@ -12,7 +13,10 @@ import (
"github.com/spiral/roadrunner/v2/worker"
)
-const MB = 1024 * 1024
+const (
+ MB = 1024 * 1024
+ supervisorName string = "supervisor"
+)
// NSEC_IN_SEC nanoseconds in second
const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
@@ -25,16 +29,16 @@ type Supervised interface {
type supervised struct {
cfg *SupervisorConfig
- events events.Handler
+ events events.EventBus
pool Pool
stopCh chan struct{}
mu *sync.RWMutex
}
-func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+func supervisorWrapper(pool Pool, eb events.EventBus, cfg *SupervisorConfig) Supervised {
sp := &supervised{
cfg: cfg,
- events: events,
+ events: eb,
pool: pool,
mu: &sync.RWMutex{},
stopCh: make(chan struct{}),
@@ -148,7 +152,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
+ sp.events.Send(events.NewEvent(events.EventTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid())))
continue
}
@@ -168,7 +172,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
+ sp.events.Send(events.NewEvent(events.EventMaxMemory, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid())))
continue
}
@@ -223,7 +227,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double-check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
+ sp.events.Send(events.NewEvent(events.EventIdleTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid())))
}
}
}
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index aca379c6..eb3c37dd 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -265,6 +265,11 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
assert.Empty(t, resp.Context)
time.Sleep(time.Second * 2)
+
+ if len(p.Workers()) < 1 {
+ t.Fatal("should be at least 1 worker")
+ return
+ }
// should be destroyed, state should be Ready, not Invalid
assert.NotEqual(t, pid, p.Workers()[0].Pid())
assert.Equal(t, int64(1), p.Workers()[0].State().Value())
@@ -326,14 +331,11 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
},
}
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.PoolEvent); ok {
- if ev.Event == events.EventMaxMemory {
- block <- struct{}{}
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch)
+ require.NoError(t, err)
// constructed
// max memory
@@ -344,7 +346,6 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
pipe.NewPipeFactory(),
cfgExecTTL,
- AddListeners(listener),
)
assert.NoError(t, err)
@@ -359,7 +360,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
assert.Empty(t, resp.Body)
assert.Empty(t, resp.Context)
- <-block
+ <-ch
p.Destroy(context.Background())
}
diff --git a/tests/worker-ok.php b/tests/psr-worker-post.php
index 63558b0f..2f54af5b 100644
--- a/tests/worker-ok.php
+++ b/tests/psr-worker-post.php
@@ -1,14 +1,16 @@
<?php
+
/**
* @var Goridge\RelayInterface $relay
*/
+
use Spiral\Goridge;
use Spiral\RoadRunner;
ini_set('display_errors', 'stderr');
require __DIR__ . "/vendor/autoload.php";
-$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+$worker = RoadRunner\Worker::create();
$psr7 = new RoadRunner\Http\PSR7Worker(
$worker,
new \Nyholm\Psr7\Factory\Psr17Factory(),
@@ -19,7 +21,8 @@ $psr7 = new RoadRunner\Http\PSR7Worker(
while ($req = $psr7->waitRequest()) {
try {
$resp = new \Nyholm\Psr7\Response();
- $resp->getBody()->write($_SERVER['RR_BROADCAST_PATH'] ?? '');
+ $resp->getBody()->write((string) $req->getBody());
+
$psr7->respond($resp);
} catch (\Throwable $e) {
$psr7->getWorker()->error((string)$e);
diff --git a/tests/temporal-worker.php b/tests/temporal-worker.php
deleted file mode 100644
index 5c9c80e6..00000000
--- a/tests/temporal-worker.php
+++ /dev/null
@@ -1,34 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-require __DIR__ . '/vendor/autoload.php';
-
-/**
- * @param string $dir
- * @return array<string>
- */
-$getClasses = static function (string $dir): iterable {
- $files = glob($dir . '/*.php');
-
- foreach ($files as $file) {
- yield substr(basename($file), 0, -4);
- }
-};
-
-$factory = \Temporal\WorkerFactory::create();
-
-$worker = $factory->newWorker('default');
-
-// register all workflows
-foreach ($getClasses(__DIR__ . '/src/Workflow') as $name) {
- $worker->registerWorkflowTypes('Temporal\\Tests\\Workflow\\' . $name);
-}
-
-// register all activity
-foreach ($getClasses(__DIR__ . '/src/Activity') as $name) {
- $class = 'Temporal\\Tests\\Activity\\' . $name;
- $worker->registerActivityImplementations(new $class);
-}
-
-$factory->run();
diff --git a/tests/worker-cors.php b/tests/worker-cors.php
deleted file mode 100644
index ea3c986c..00000000
--- a/tests/worker-cors.php
+++ /dev/null
@@ -1,15 +0,0 @@
-<?php
-
-use Spiral\RoadRunner\Worker;
-use Spiral\RoadRunner\Http\HttpWorker;
-
-ini_set('display_errors', 'stderr');
-require __DIR__ . '/vendor/autoload.php';
-
-$http = new HttpWorker(Worker::create());
-
-while ($req = $http->waitRequest()) {
- $http->respond(200, 'Response', [
- 'Access-Control-Allow-Origin' => ['*']
- ]);
-}
diff --git a/tests/worker-deny.php b/tests/worker-deny.php
deleted file mode 100644
index 6dc993f6..00000000
--- a/tests/worker-deny.php
+++ /dev/null
@@ -1,30 +0,0 @@
-<?php
-/**
- * @var Goridge\RelayInterface $relay
- */
-use Spiral\Goridge;
-use Spiral\RoadRunner;
-
-ini_set('display_errors', 'stderr');
-require __DIR__ . "/vendor/autoload.php";
-
-$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
-$psr7 = new RoadRunner\Http\PSR7Worker(
- $worker,
- new \Nyholm\Psr7\Factory\Psr17Factory(),
- new \Nyholm\Psr7\Factory\Psr17Factory(),
- new \Nyholm\Psr7\Factory\Psr17Factory()
-);
-
-while ($req = $psr7->waitRequest()) {
- try {
- $resp = new \Nyholm\Psr7\Response();
- if ($req->getAttribute('ws:joinServer')) {
- $psr7->respond($resp->withStatus(200));
- } else {
- $psr7->respond($resp->withStatus(401));
- }
- } catch (\Throwable $e) {
- $psr7->getWorker()->error((string)$e);
- }
-}
diff --git a/tests/worker-origin.php b/tests/worker-origin.php
deleted file mode 100644
index 6ce4de59..00000000
--- a/tests/worker-origin.php
+++ /dev/null
@@ -1,14 +0,0 @@
-<?php
-
-use Spiral\RoadRunner\Worker;
-use Spiral\RoadRunner\Http\HttpWorker;
-
-require __DIR__ . '/vendor/autoload.php';
-
-$http = new HttpWorker(Worker::create());
-
-while ($req = $http->waitRequest()) {
- $http->respond(200, 'Response', [
- 'Access-Control-Allow-Origin' => ['*']
- ]);
-}
diff --git a/tests/worker-stop.php b/tests/worker-stop.php
deleted file mode 100644
index 83fc5710..00000000
--- a/tests/worker-stop.php
+++ /dev/null
@@ -1,26 +0,0 @@
-<?php
-/**
- * @var Goridge\RelayInterface $relay
- */
-use Spiral\Goridge;
-use Spiral\RoadRunner;
-
-ini_set('display_errors', 'stderr');
-require __DIR__ . "/vendor/autoload.php";
-
-$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
-$psr7 = new RoadRunner\Http\PSR7Worker(
- $worker,
- new \Nyholm\Psr7\Factory\Psr17Factory(),
- new \Nyholm\Psr7\Factory\Psr17Factory(),
- new \Nyholm\Psr7\Factory\Psr17Factory()
-);
-
-while ($req = $psr7->waitRequest()) {
- try {
- $resp = new \Nyholm\Psr7\Response();
- $psr7->respond($resp->withAddedHeader('stop', 'we-dont-like-you')->withStatus(401));
- } catch (\Throwable $e) {
- $psr7->getWorker()->error((string)$e);
- }
-}
diff --git a/transport/interface.go b/transport/interface.go
index e20f2b0b..0d6c8e8b 100644
--- a/transport/interface.go
+++ b/transport/interface.go
@@ -4,7 +4,6 @@ import (
"context"
"os/exec"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/worker"
)
@@ -12,10 +11,10 @@ import (
type Factory interface {
// SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context.
// Process must not be started.
- SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error)
+ SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (*worker.Process, error)
// SpawnWorker creates new WorkerProcess process based on given command.
// Process must not be started.
- SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error)
+ SpawnWorker(*exec.Cmd) (*worker.Process, error)
// Close the factory and underlying connections.
Close() error
}
diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go
index 3ea8fd98..c70b3f65 100755
--- a/transport/pipe/pipe_factory.go
+++ b/transport/pipe/pipe_factory.go
@@ -5,7 +5,6 @@ import (
"os/exec"
"github.com/spiral/goridge/v3/pkg/pipe"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
)
@@ -27,10 +26,10 @@ type sr struct {
// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) {
spCh := make(chan sr)
go func() {
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
select {
case spCh <- sr{
@@ -130,8 +129,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) {
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
return nil, err
}
diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go
index 45b7aef8..256176de 100644
--- a/transport/pipe/pipe_factory_spawn_test.go
+++ b/transport/pipe/pipe_factory_spawn_test.go
@@ -12,6 +12,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_GetState2(t *testing.T) {
@@ -105,21 +106,21 @@ func Test_Pipe_PipeError4(t *testing.T) {
func Test_Pipe_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
- w, err := NewPipeFactory().SpawnWorker(cmd, listener)
+
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Pipe_Invalid2(t *testing.T) {
@@ -368,17 +369,14 @@ func Test_Echo_Slow2(t *testing.T) {
func Test_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- data := ""
- mu := &sync.Mutex{}
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- mu.Lock()
- data = string(wev.Payload.([]byte))
- mu.Unlock()
- }
- }
- w, err := NewPipeFactory().SpawnWorker(cmd, listener)
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -390,11 +388,11 @@ func Test_Broken2(t *testing.T) {
assert.Nil(t, res)
time.Sleep(time.Second * 3)
- mu.Lock()
- if strings.ContainsAny(data, "undefined_function()") == false {
+
+ msg := <-ch
+ if strings.ContainsAny(msg.Message(), "undefined_function()") == false {
t.Fail()
}
- mu.Unlock()
assert.Error(t, w.Stop())
}
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index b4ba8c87..0f527cd5 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_GetState(t *testing.T) {
@@ -125,22 +126,21 @@ func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
ctx := context.Background()
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Pipe_Invalid(t *testing.T) {
@@ -433,17 +433,14 @@ func Test_Broken(t *testing.T) {
t.Parallel()
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- data := ""
- mu := &sync.Mutex{}
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- mu.Lock()
- data = string(wev.Payload.([]byte))
- mu.Unlock()
- }
- }
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
+
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -455,11 +452,10 @@ func Test_Broken(t *testing.T) {
assert.Nil(t, res)
time.Sleep(time.Second * 3)
- mu.Lock()
- if strings.ContainsAny(data, "undefined_function()") == false {
+ msg := <-ch
+ if strings.ContainsAny(msg.Message(), "undefined_function()") == false {
t.Fail()
}
- mu.Unlock()
assert.Error(t, w.Stop())
}
diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go
index dfffdf4e..06d7000d 100755
--- a/transport/socket/socket_factory.go
+++ b/transport/socket/socket_factory.go
@@ -12,7 +12,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/goridge/v3/pkg/socket"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
@@ -83,12 +82,12 @@ type socketSpawn struct {
}
// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) {
c := make(chan socketSpawn)
go func() {
ctxT, cancel := context.WithTimeout(ctx, f.tout)
defer cancel()
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
select {
case c <- socketSpawn{
@@ -157,8 +156,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) {
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
return nil, err
}
diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go
index 363a3510..2db2fd40 100644
--- a/transport/socket/socket_factory_spawn_test.go
+++ b/transport/socket/socket_factory_spawn_test.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_Tcp_Start2(t *testing.T) {
@@ -110,21 +111,20 @@ func Test_Tcp_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err2)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Tcp_Invalid2(t *testing.T) {
@@ -162,18 +162,13 @@ func Test_Tcp_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -198,7 +193,11 @@ func Test_Tcp_Broken2(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
wg.Wait()
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function() string")
+ }
}
func Test_Tcp_Echo2(t *testing.T) {
@@ -273,21 +272,20 @@ func Test_Unix_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Unix_Timeout2(t *testing.T) {
@@ -331,18 +329,13 @@ func Test_Unix_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -367,7 +360,11 @@ func Test_Unix_Broken2(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
wg.Wait()
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function string")
+ }
}
func Test_Unix_Echo2(t *testing.T) {
diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go
index d517d026..7b28a847 100755
--- a/transport/socket/socket_factory_test.go
+++ b/transport/socket/socket_factory_test.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_Tcp_Start(t *testing.T) {
@@ -124,21 +125,20 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err2)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Tcp_Timeout(t *testing.T) {
@@ -203,18 +203,13 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -239,7 +234,11 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
wg.Wait()
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function string")
+ }
}
func Test_Tcp_Echo(t *testing.T) {
@@ -368,21 +367,20 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Unix_Timeout(t *testing.T) {
@@ -444,20 +442,13 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- if wev.Event == events.EventWorkerStderr {
- e := string(wev.Payload.([]byte))
- if strings.ContainsAny(e, "undefined_function()") {
- block <- struct{}{}
- return
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -481,7 +472,12 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-block
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function string")
+ }
+
wg.Wait()
}
diff --git a/worker/worker.go b/worker/worker.go
index 38a1e9ac..05c6dd0d 100755
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -12,18 +12,24 @@ import (
"github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/utils"
"go.uber.org/multierr"
)
type Options func(p *Process)
+const (
+ workerEventsName string = "worker"
+)
+
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
created time.Time
// updates parent supervisor or pool about Process events
- events events.Handler
+ events events.EventBus
+ eventsID string
// state holds information about current Process state,
// number of Process executions, buf status change time.
@@ -49,11 +55,14 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
+
+ eb, id := events.Bus()
w := &Process{
- created: time.Now(),
- events: events.NewEventsHandler(),
- cmd: cmd,
- state: NewWorkerState(StateInactive),
+ created: time.Now(),
+ events: eb,
+ eventsID: id,
+ cmd: cmd,
+ state: NewWorkerState(StateInactive),
}
// set self as stderr implementation (Writer interface)
@@ -67,14 +76,6 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
return w, nil
}
-func AddListeners(listeners ...events.Listener) Options {
- return func(p *Process) {
- for i := 0; i < len(listeners); i++ {
- p.addListener(listeners[i])
- }
- }
-}
-
// Pid returns worker pid.
func (w *Process) Pid() int64 {
return int64(w.pid)
@@ -85,11 +86,6 @@ func (w *Process) Created() time.Time {
return w.created
}
-// AddListener registers new worker event listener.
-func (w *Process) addListener(listener events.Listener) {
- w.events.AddListener(listener)
-}
-
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
func (w *Process) State() State {
@@ -139,6 +135,7 @@ func (w *Process) Wait() error {
const op = errors.Op("process_wait")
var err error
err = w.cmd.Wait()
+ defer w.events.Unsubscribe(w.eventsID)
// If worker was destroyed, just exit
if w.State().Value() == StateDestroyed {
@@ -187,9 +184,13 @@ func (w *Process) Stop() error {
if err != nil {
w.state.Set(StateKilling)
_ = w.cmd.Process.Signal(os.Kill)
+
+ w.events.Unsubscribe(w.eventsID)
return errors.E(op, errors.Network, err)
}
+
w.state.Set(StateStopped)
+ w.events.Unsubscribe(w.eventsID)
return nil
}
@@ -201,6 +202,8 @@ func (w *Process) Kill() error {
if err != nil {
return err
}
+
+ w.events.Unsubscribe(w.eventsID)
return nil
}
@@ -210,11 +213,13 @@ func (w *Process) Kill() error {
return err
}
w.state.Set(StateStopped)
+
+ w.events.Unsubscribe(w.eventsID)
return nil
}
// Worker stderr
func (w *Process) Write(p []byte) (n int, err error) {
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p})
+ w.events.Send(events.NewEvent(events.EventWorkerStderr, workerEventsName, utils.AsString(p)))
return len(p), nil
}
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index 175972e0..d425994e 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -2,6 +2,7 @@ package worker_watcher //nolint:stylecheck
import (
"context"
+ "fmt"
"sync"
"sync/atomic"
"time"
@@ -13,6 +14,10 @@ import (
"github.com/spiral/roadrunner/v2/worker_watcher/container/channel"
)
+const (
+ wwName string = "worker_watcher"
+)
+
// Vector interface represents vector container
type Vector interface {
// Push used to put worker to the vector
@@ -34,25 +39,28 @@ type workerWatcher struct {
// used to control Destroy stage (that all workers are in the container)
numWorkers *uint64
- workers []worker.BaseProcess
+ workers []worker.BaseProcess
+ events events.EventBus
+ eventsID string
allocator worker.Allocator
allocateTimeout time.Duration
- events events.Handler
}
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher {
+ eb, id := events.Bus()
ww := &workerWatcher{
container: channel.NewVector(numWorkers),
+ events: eb,
+ eventsID: id,
// pass a ptr to the number of workers to avoid blocking in the TTL loop
numWorkers: utils.Uint64(numWorkers),
allocateTimeout: allocateTimeout,
workers: make([]worker.BaseProcess, 0, numWorkers),
allocator: allocator,
- events: events,
}
return ww
@@ -140,11 +148,7 @@ func (ww *workerWatcher) Allocate() error {
sw, err := ww.allocator()
if err != nil {
// log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err)))
// if no timeout, return error immediately
if ww.allocateTimeout == 0 {
@@ -168,11 +172,7 @@ func (ww *workerWatcher) Allocate() error {
sw, err = ww.allocator()
if err != nil {
// log incident
- ww.events.Push(
- events.WorkerEvent{
- Event: events.EventWorkerError,
- Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err)))
continue
}
@@ -234,6 +234,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) {
ww.container.Destroy()
ww.Unlock()
+ ww.events.Unsubscribe(ww.eventsID)
tt := time.NewTicker(time.Millisecond * 100)
defer tt.Stop()
for { //nolint:gosimple
@@ -278,10 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
- ww.events.Push(events.WorkerEvent{
- Event: events.EventWorkerWaitExit,
- Payload: err,
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("error: %v", err)))
}
// remove worker
@@ -289,7 +287,8 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
- ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("pid: %d", w.Pid())))
+
return
}
@@ -298,10 +297,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
err = ww.Allocate()
if err != nil {
- ww.events.Push(events.PoolEvent{
- Event: events.EventWorkerProcessExit,
- Error: errors.E(op, err),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("error: %v", err)))
// no workers at all, panic
if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {