diff options
Diffstat (limited to 'docs/beep-beep/jobs.md')
-rw-r--r-- | docs/beep-beep/jobs.md | 1166 |
1 files changed, 1166 insertions, 0 deletions
diff --git a/docs/beep-beep/jobs.md b/docs/beep-beep/jobs.md new file mode 100644 index 00000000..3aa4e1c0 --- /dev/null +++ b/docs/beep-beep/jobs.md @@ -0,0 +1,1166 @@ +# Jobs + +Starting with RoadRunner >= 2.4, a queuing system (aka "jobs") is available. +This plugin allows you to move arbitrary "heavy" code into separate tasks to +execute them asynchronously in an external worker, which will be referred to +as "consumer" in this documentation. + +The RoadRunner PHP library provides both API implementations: The client one, +which allows you to dispatch tasks, and the server one, which provides the +consumer who processes the tasks. + +![queue](https://user-images.githubusercontent.com/2461257/128100380-2d4df71a-c86e-4d5d-a58e-a3d503349200.png) + +## Installation + +> **Requirements** +> - PHP >= 7.4 +> - RoadRunner >= 2.4 +> - *ext-protobuf (optional)* + +To get access from the PHP code, you should put the corresponding dependency +using [the Composer](https://getcomposer.org/). + +```sh +$ composer require spiral/roadrunner-jobs +``` + +## Configuration + +After installing all the required dependencies, you need to configure this +plugin. To enable it add `jobs` section to your configuration. + +For example, in this way, you can configure both the client and server parts to +work with RabbitMQ. + +```yaml +# +# RPC is required for tasks dispatching (client) +# +rpc: + listen: tcp://127.0.0.1:6001 + +# +# This section configures the task consumer (server) +# +server: + command: php consumer.php + relay: pipes + +# +# In this section, the jobs themselves are configured +# +jobs: + consume: [ "test" ] # List of RoadRunner queues that can be processed by + # the consumer specified in the "server" section. + pipelines: + test: # RoadRunner queue identifier + driver: memory # - Queue driver name + queue: test # - Internal (driver's) queue identifier +``` + +- The `rpc` section is responsible for client settings. It is at this address + that we will connect, *dispatching tasks* to the queue. + +- The `server` section is responsible for configuring the server. Previously, we + have already met with its description when setting up the [PHP Worker](/php/worker.md). + +- And finally, the `jobs` section is responsible for the work of the queues + themselves. It contains information on how the RoadRunner should work with + connections to drivers, what can be handled by the consumer, and other + queue-specific settings. + +### Common Configuration + +Let's now focus on the common settings of the queue server. In full, it may +look like this: + +```yaml +jobs: + num_pollers: 64 + timeout: 60 + pipeline_size: 100000 + pool: + num_workers: 10 + allocate_timeout: 60s + destroy_timeout: 60s + consume: [ "queue-name" ] + pipelines: + queue-name: + driver: # "[DRIVER_NAME]" + # And driver-specific configuration below... +``` + +Above is a complete list of all possible common Jobs settings. Let's now figure +out what they are responsible for. + +- `num_pollers` - The number of threads that concurrently read from the priority + queue and send payloads to the workers. There is no optimal number, it's + heavily dependent on the PHP worker's performance. For example, "echo workers" + may process over 300k jobs per second within 64 pollers (on 32 core CPU). + +- `timeout` - The internal timeouts via golang context (in seconds). For + example, if the connection was interrupted or your push in the middle of the + redial state with 10 minutes timeout (but our timeout is 1 min for example), + or queue is full. If the timeout exceeds, your call will be rejected with an + error. Default: 60 (seconds). + +- `pipeline_size` - The "binary heaps" priority queue (PQ) settings. Priority + queue stores jobs inside according to its' priorities. Priority might be set + for the job or inherited by the pipeline. If worker performance is poor, PQ + will accumulate jobs until `pipeline_size` will be reached. After that, PQ + will be blocked until workers process all the jobs inside. + + Blocked PQ means, that you can push the job into the driver, but RoadRunner + will not read that job until PQ will be empty. If RoadRunner will be killed + with jobs inside the PQ, they won't be lost, because jobs are deleted from the + drivers' queue only after Ack. + +- `pool` - All settings in this section are similar to the worker pool settings + described on the [configuration page](https://roadrunner.dev/docs/intro-config). + +- `consume` - Contains an array of the names of all queues specified in the + `"pipelines"` section, which should be processed by the concierge specified in + the global `"server"` section (see the [PHP worker's settings](/php/worker.md)). + +- `pipelines` - This section contains a list of all queues declared in the + RoadRunner. The key is a unique *queue identifier*, and the value is an object + from the settings specific to each driver (we will talk about it later). + +### Memory Driver + +This type of driver is already supported by the RoadRunner and does not require +any additional installations. + +Note that using this type of queue driver, all data is in memory and will be +destroyed when the RoadRunner Server is restarted. If you need persistent +queue, then it is recommended to use alternative drivers: `amqp`, `beanstalk` +or `sqs`. + +The complete `memory` driver configuration looks like this: + +```yaml +jobs: + pipelines: + # User defined name of the queue. + example: + # Required section. + # Should be "memory" for the in-memory driver. + driver: memory + + # Optional section. + # Default: 10 + priority: 10 + + # Optional section. + # Default: 10 + prefetch: 10 +``` + +Below is a more detailed description of each of the in-memory-specific options: +- `priority` - Queue default priority for each task pushed into this queue + if the priority value for these tasks was not explicitly set. + +- `prefetch` - A local buffer between the PQ (priority queue) and driver. If the + PQ size is set to 100 and prefetch to 100000, you'll be able to push up to + prefetch number of jobs even if PQ is full. + +> Please note that this driver cannot hold more than 1000 tasks with delay at +> the same time (RR limitation) + +### Local (based on the boltdb) Driver + +This type of driver is already supported by the RoadRunner and does not require +any additional installations. It uses boltdb as its main storage for the jobs. This driver should be used locally, for +testing or developing purposes. It can be used in the production, but this type of driver can't handle +huge load. Maximum RPS it can have no more than 30-50. + +Data in this driver persists in the boltdb database file. You can't open same file simultaneously for the 2 pipelines or +for the KV plugin and Jobs plugin. This is boltdb limitation on concurrent access from the 2 processes to the same file. + +The complete `boltdb` driver configuration looks like this: + +```yaml + +boltdb: + permissions: 0777 + +jobs: + pipelines: + # User defined name of the queue. + example: + # Required section. + # Should be "boltdb" for the local driver. + driver: boltdb + + # Optional section. + # Default: 10 + priority: 10 + + # Optional section. + # Default: 1000 + prefetch: 1000 +``` + +Below is a more detailed description of each of the in-memory-specific options: +- `priority` - Queue default priority for each task pushed into this queue + if the priority value for these tasks was not explicitly set. + +- `prefetch` - A local buffer between the PQ (priority queue) and driver. If the + PQ size is set to 100 and prefetch to 100000, you'll be able to push up to + prefetch number of jobs even if PQ is full. + +- `file` - boltdb database file to use. Might be a path with file: `foo/bar/rr1.db`. Default: `rr.db`. + + +### AMQP Driver + +Strictly speaking, AMQP (and 0.9.1 version used) is a protocol, not a full-fledged driver, so you can use +any servers that support this protocol (on your own, only rabbitmq was tested) , such as: +[RabbitMQ](https://www.rabbitmq.com/), [Apache Qpid](http://qpid.apache.org/) or +[Apache ActiveMQ](http://activemq.apache.org/). However, it is recommended to +use RabbitMQ as the main implementation, and reliable performance with other +implementations is not guaranteed. + +To install and configure the RabbitMQ, use the corresponding +[documentation page](https://www.rabbitmq.com/download.html). After that, you +should configure the connection to the server in the "`amqp`" section. This +configuration section contains exactly one `addr` key with a +[connection DSN](https://www.rabbitmq.com/uri-spec.html). + +```yaml +amqp: + addr: amqp://guest:guest@localhost:5672 +``` + +After creating a connection to the server, you can create a new queue that will +use this connection and which will contain the queue settings (including +amqp-specific): + +```yaml +amqp: + addr: amqp://guest:guest@localhost:5672 + + +jobs: + pipelines: + # User defined name of the queue. + example: + # Required section. + # Should be "amqp" for the AMQP driver. + driver: amqp + + # Optional section. + # Default: 10 + priority: 10 + + # Optional section. + # Default: 100 + prefetch: 100 + + # Optional section. + # Default: "default" + queue: "default" + + # Optional section. + # Default: "amqp.default" + exchange: "amqp.default" + + # Optional section. + # Default: "direct" + exchange_type: "direct" + + # Optional section. + # Default: "" (empty) + routing_key: "" + + # Optional section. + # Default: false + exclusive: false + + # Optional section. + # Default: false + multiple_ack: false + + # Optional section. + # Default: false + requeue_on_fail: false +``` + +Below is a more detailed description of each of the amqp-specific options: +- `priority` - Queue default priority for for each task pushed into this queue + if the priority value for these tasks was not explicitly set. + +- `prefetch` - The client can request that messages be sent in advance so that + when the client finishes processing a message, the following message is + already held locally, rather than needing to be sent down the channel. + Prefetching gives a performance improvement. This field specifies the prefetch + window size in octets. See also ["prefetch-size"](https://www.rabbitmq.com/amqp-0-9-1-reference.html) + in AMQP QoS documentation reference. + +- `queue` - AMQP internal (inside the driver) queue name. + +- `exchange` - The name of AMQP exchange to which tasks are sent. Exchange + distributes the tasks to one or more queues. It routes tasks to the queue + based on the created bindings between it and the queue. See also + ["AMQP model"](https://www.rabbitmq.com/tutorials/amqp-concepts.html#amqp-model) + documentation section. + +- `exchange_type` - The type of task delivery. May be one of `direct`, `topics`, + `headers` or `fanout`. + - `direct` - Used when a task needs to be delivered to specific queues. The + task is published to an exchanger with a specific routing key and goes to + all queues that are associated with this exchanger with a similar routing + key. + - `topics` - Similarly, `direct` exchange enables selective routing by + comparing the routing key. But, in this case, the key is set using a + template, like: `user.*.messages`. + - `fanout` - All tasks are delivered to all queues even if a routing key is + specified in the task. + - `headers` - Routes tasks to related queues based on a comparison of the + (key, value) pairs of the headers property of the binding and the similar + property of the message. + + - `routing_key` - Queue's routing key. + + - `exclusive` - Exclusive queues can't be redeclared. If set to true and + you'll try to declare the same pipeline twice, that will lead to an error. + + - `multiple_ack` - This delivery and all prior unacknowledged deliveries on + the same channel will be acknowledged. This is useful for batch processing + of deliveries. Applicable only for the Ack, not for the Nack. + + - `requeue_on_fail` - Requeue on Nack. + +### Beanstalk Driver + +Beanstalk is a simple and fast general purpose work queue. To install Beanstalk, +you can use the [local queue server](https://github.com/beanstalkd/beanstalkd) +or run the server inside [AWS Elastic](https://aws.amazon.com/elasticbeanstalk/). +You can choose any option that is convenient for you. + +Setting up the server is similar to setting up AMQP and requires specifying the +connection in the `"beanstalk"` section of your RoadRunner configuration file. + +```yaml +beanstalk: + addr: tcp://127.0.0.1:11300 +``` + +After setting up the connection, you can start using it. Let's take a look at +the complete config with all the options for this driver: + +```yaml +beanstalk: + # Optional section. + # Default: tcp://127.0.0.1:11300 + addr: tcp://127.0.0.1:11300 + + # Optional section. + # Default: 30s + timeout: 10s + +jobs: + pipelines: + # User defined name of the queue. + example: + # Required section. + # Should be "beanstalk" for the Beanstalk driver. + driver: beanstalk + + # Optional section. + # Default: 10 + priority: 10 + + # Optional section. + # Default: 1 + tube_priority: 1 + + # Optional section. + # Default: default + tube: default + + # Optional section. + # Default: 5s + reserve_timeout: 5s +``` + +These are all settings that are available to you for configuring this type of +driver. Let's take a look at what they are responsible for: +- `priority` - Similar to the same option in other drivers. This is queue + default priority for for each task pushed into this queue if the priority + value for these tasks was not explicitly set. + +- `tube_priority` - The value for specifying the priority within Beanstalk is + the internal priority of the server. The value should not exceed `int32` size. + +- `tube` - The name of the inner "tube" specific to the Beanstalk driver. + +### SQS Driver + +[Amazon SQS (Simple Queue Service)](https://aws.amazon.com/sqs/) is an +alternative queue server also developed by Amazon and is also part of the AWS +service infrastructure. If you prefer to use the "cloud" option, then you can +use the [ready-made documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configuring.html) +for its installation. + +In addition to the possibility of using this queue server within the AWS, you +can also use the local installation of this system on your own servers. If you +prefer this option, then you can use [softwaremill's implementation](https://github.com/softwaremill/elasticmq) +of the Amazon SQS server. + +After you have created the SQS server, you need to specify the following +connection settings in `sqs` configuration settings. Unlike AMQP and Beanstalk, +SQS requires more values to set up a connection and will be different from what +we're used to: + +```yaml +sqs: + # Required AccessKey ID. + # Default: empty + key: access-key + + # Required secret access key. + # Default: empty + secret: api-secret + + # Required AWS region. + # Default: empty + region: us-west-1 + + # Required AWS session token. + # Default: empty + session_token: test + + # Required AWS SQS endpoint to connect. + # Default: http://127.0.0.1:9324 + endpoint: http://127.0.0.1:9324 +``` + +> Please note that although each of the sections contains default values, it is +> marked as "required". This means that in almost all cases they are required to +> be specified in order to correctly configure the driver. + +After you have configured the connection - you should configure the queue that +will use this connection: + +```yaml +sqs: + # SQS connection configuration... + +jobs: + pipelines: + # Required section. + # Should be "sqs" for the Amazon SQS driver. + driver: sqs + + # Optional section. + # Default: 10 + prefetch: 10 + + # Optional section. + # Default: 0 + visibility_timeout: 0 + + # Optional section. + # Default: 0 + wait_time_seconds: 0 + + # Optional section. + # Default: default + queue: default + + # Optional section. + # Default: empty + attributes: + DelaySeconds: 42 + # etc... see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html + + # Optional section. + # Default: empty + tags: + test: "tag" +``` + +Below is a more detailed description of each of the SQS-specific options: +- `prefetch` - Number of jobs to prefetch from the SQS. Amazon SQS never returns + more messages than this value (however, fewer messages might be returned). + Valid values: 1 to 10. Any number bigger than 10 will be rounded to 10. + Default: `10`. + +- `visibility_timeout` - The duration (in seconds) that the received messages + are hidden from subsequent retrieve requests after being retrieved by a + ReceiveMessage request. Max value is 43200 seconds (12 hours). Default: `0`. + +- `wait_time_seconds` - The duration (in seconds) for which the call waits for + a message to arrive in the queue before returning. If a message is available, + the call returns sooner than WaitTimeSeconds. If no messages are available and + the wait time expires, the call returns successfully with an empty list of + messages. Default: `5`. + +- `queue` - SQS internal queue name. Can contain alphanumeric characters, + hyphens (-), and underscores (_). Default value is `"default"` string. + +- `attributes` - List of the [AWS SQS attributes](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html). +> For example +> ```yaml +> attributes: +> DelaySeconds: 0 +> MaximumMessageSize: 262144 +> MessageRetentionPeriod: 345600 +> ReceiveMessageWaitTimeSeconds: 0 +> VisibilityTimeout: 30 +> ``` + +- `tags` - Tags don't have any semantic meaning. Amazon SQS interprets tags as + character. +> Please note that this functionality is rarely used and slows down the work of +> queues: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html + +## Client (Producer) + +Now that we have configured the server, we can start writing our first code for +sending the task to the queue. But before doing this, we need to connect to our +server. And to do this, it is enough to create a `Jobs` instance. + +```php +// Server Connection +$jobs = new Spiral\RoadRunner\Jobs\Jobs(); +``` + +Please note that in this case we have not specified any connection settings. And +this is really not required if this code is executed in a RoadRunner environment. +However, in the case that a connection is required to be established +from a third-party application (for example, a CLI command), then the settings +must be specified explicitly. + +```php +$jobs = new Spiral\RoadRunner\Jobs\Jobs( + // Expects RPC connection + Spiral\Goridge\RPC\RPC::create('tcp://127.0.0.1:6001') +); +``` + +After we have established the connection, we should check the server +availability and in this case the API availability for the jobs. This can be +done using the appropriate `isAvailable()` method. When the connection is +created, and the availability of the functionality is checked, we can connect to +the queue we need using `connect()` method. + +```php +$jobs = new Spiral\RoadRunner\Jobs\Jobs(); + +if (!$jobs->isAvailable()) { + throw new LogicException('The server does not support "jobs" functionality =('); +} + +$queue = $jobs->connect('queue-name'); +``` + +### Task Creation + +Before submitting a task to the queue, you should create this task. To create a +task, it is enough to call the corresponding `create()` method. + +```php +$task = $queue->create(SendEmailTask::class); +// Expected: +// object(Spiral\RoadRunner\Jobs\Task\PreparedTaskInterface) +``` + +> Note that the name of the task does not have to be a class. Here we are using +> `SendEmailTask` just for convenience. + +Also, this method takes an additional second argument with additional data to +complete this task. + +```php +$task = $queue->create(SendEmailTask::class, ['email' => '[email protected]']); +``` + +You can also use this task as a basis for creating several others. + +```php +$task = $queue->create(SendEmailTask::class); + +$first = $task->withValue('[email protected]'); +$second = $task->withValue('[email protected]'); +``` + +### Task Dispatching + +And to send tasks to the queue, we can use different methods: +`dispatch()` and `dispatchMany()`. The difference between these two +implementations is that the first one sends a task to the queue, returning a +dispatched task object, while the second one dispatches multiple tasks, +returning an array. Moreover, the second method provides one-time delivery of +all tasks in the array, as opposed to sending each task separately. + +```php +$a = $queue->create(SendEmailTask::class, ['email' => '[email protected]']); +$b = $queue->create(SendEmailTask::class, ['email' => '[email protected]']); + +foreach ([$a, $b] as $task) { + $result = $queue->dispatch($task); + // Expected: + // object(Spiral\RoadRunner\Jobs\Task\QueuedTaskInterface) +} + +// Using a batching send +$result = $queue->dispatchMany($a, $b); +// Expected: +// array(2) { +// object(Spiral\RoadRunner\Jobs\Task\QueuedTaskInterface), +// object(Spiral\RoadRunner\Jobs\Task\QueuedTaskInterface) +// } +``` + +### Task Immediately Dispatching + +In the case that you do not want to create a new task and then immediately +dispatch it, you can simplify the work by using the `push` method. However, this +functionality has a number of limitations. In case of creating a new task: +- You can flexibly configure additional task capabilities using a convenient + fluent interface. +- You can prepare a common task for several others and use it as a basis to + create several alternative tasks. +- You can create several different tasks and collect them into one collection + and send them to the queue at once (using the so-called batching). + +In the case of immediate dispatch, you will have access to only the basic +features: The `push()` method accepts one required argument with the +name of the task and two optional arguments containing additional data for the +task being performed and additional sending options (for example, a delay). +Moreover, this method is designed to send only one task. + +```php +use Spiral\RoadRunner\Jobs\Options; + +$payload = ['email' => $email, 'message' => $message]; + +$task = $queue->push(SendEmailTask::class, $payload, new Options( + delay: 60 // in seconds +)); +``` + +### Task Payload + +As you can see, each task, in addition to the name, can contain additional data +(payload) specific to a certain type of task. You yourself can determine what +data should be transferred to the task and no special requirements are imposed +on them, except for the main ones: Since this task is then sent to the queue, +they must be serializable. + +> The default serializer used in jobs allows you to pass anonymous functions +> as well. + +In case to add additional data, you can use the optional second argument +provided by the `create()` and `push()` methods, or you can use the fluent +interface to supplement or modify the task data. Everything is quite simple +here; you can add data using the `withValue()` method, or delete them using the +`withoutValue()` method. + +The first argument of the `withValue()` method passes a payload value as the +required first argument. If you also need to specify a key for it, just pass it +as an optional second argument. + +```php +$task = $queue->create(CreateBackup::class) + ->withValue('/var/www') + ->withValue(42, 'answer') + ->withValue('/dev/null', 'output'); + +// An example like this will be completely equivalent to if we passed +// all this data at one time +$task = $queue->create(CreateBackup::class, [ + '/var/www', + 'answer' => 42, + 'output' => '/dev/null' +]); + +// On the other hand, we don't need an "answer"... +$task = $task->withoutValue('answer'); +``` + +### Task Headers + +In addition to the data itself, we can send additional metadata that is not +related to the payload of the task, that is, headers. In them, we can pass +any additional information, for example: Encoding of messages, their format, +the server's IP address, the user's token or session id, etc. + +Headers can only contain string values and are not serialized in any way during +transmission, so be careful when specifying them. + +In the case to add a new header to the task, you can use methods +[similar to PSR-7](https://www.php-fig.org/psr/psr-7/). That is: +- `withHeader(string, iterable<string>|string): self` - Return an instance with + the provided value replacing the specified header. +- `withAddedHeader(string, iterable<string>|string): self` - Return an instance + with the specified header appended with the given value. +- `withoutHeader(string): self` - Return an instance without the specified header. + +```php +$task = $queue->create(RestartServer::class) + ->withValue('addr', '127.0.0.1') + ->withAddedHeader('access-token', 'IDDQD'); + +$queue->dispatch($task); +``` + +### Task Delayed Dispatching + +If you want to specify that a job should not be immediately available for +processing by a jobs worker, you can use the delayed job option. +For example, let's specify that a job shouldn't be available for processing +until 42 minutes after it has been dispatched: + +```php +$task = $queue->create(SendEmailTask::class) + ->withDelay(42 * 60); // 42 min * 60 sec +``` + +## Consumer Usage + +You probably already noticed that when [setting up a jobs consumer](#configuration), +the `"server"` configuration section is used in which a PHP file-handler is defined. +Exactly the same one we used earlier to write a [HTTP Worker](/php/worker.md). +Does this mean that if we want to use the Jobs Worker, then we can no longer +use the HTTP Worker? No it is not! + +During the launch of the RoadRunner, it spawns several workers defined in the +`"server"` config section (by default, the number of workers is equal to the +number of CPU cores). At the same time, during the spawn of the workers, it +transmits in advance to each of them information about the *mode* in which this +worker will be used. The information about the *mode* itself is contained in the +environment variable `RR_ENV` and for the HTTP worker the value will correspond +to the `"http"`, and for the Jobs worker the value of `"jobs"` will be stored +there. + +![queue-mode](https://user-images.githubusercontent.com/2461257/128106755-cb0d3cb7-3f98-433e-a1c7-1ed92839376a.png) + +There are several ways to check the operating mode from the code: +- By getting the value of the env variable. +- Or using the appropriate API method (from the `spiral/roadrunner-worker` package). + +The second choice may be more preferable in cases where you need to change the +RoadRunner's mode, for example, in tests. + +```php +use Spiral\RoadRunner\Environment; +use Spiral\RoadRunner\Environment\Mode; + +// 1. Using global env variable +$isJobsMode = $_SERVER['RR_MODE'] === 'jobs'; + +// 2. Using RoadRunner's API +$env = Environment::fromGlobals(); + +$isJobsMode = $env->getMode() === Mode::MODE_JOBS; +``` + +After we are convinced of the specialization of the worker, we can write the +corresponding code for processing tasks. To get information about the available +task in the worker, use the +`$consumer->waitTask(): ReceivedTaskInterface` method. + +```php +use Spiral\RoadRunner\Jobs\Consumer; +use Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface; + + +$consumer = new Consumer(); + +/** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */ +while ($task = $consumer->waitTask()) { + var_dump($task); +} +``` + +After you receive the task from the queue, you can start processing it in +accordance with the requirements. Don't worry about how much memory or time this +execution takes - the RoadRunner takes over the tasks of managing and +distributing tasks among the workers. + +After you have processed the incoming task, you can execute the +`complete(): void` method. After that, you tell the RoadRunner that you are +ready to handle the next task. + +```php +$consumer = new Spiral\RoadRunner\Jobs\Consumer(); + +while ($task = $consumer->waitTask()) { + + // + // Task handler code + // + + $task->complete(); +} +``` + +We got acquainted with the possibilities of receiving and processing tasks, but +we do not yet know what the received task is. Let's see what data it contains. + +### Task Failing + +In some cases, an error may occur during task processing. In this case, you +should use the `fail()` method, informing the RoadRunner about it. The method +takes two arguments. The first argument is required and expects any string or +string-like (instance of Stringable, for example any exception) value with an +error message. The second is optional and tells the server to restart this task. + +```php +$consumer = new Spiral\RoadRunner\Jobs\Consumer(); +$shouldBeRestarted = false; + +while ($task = $consumer->waitTask()) { + try { + // + // Do something... + // + $task->complete(); + } catch (\Throwable $e) { + $task->fail($e, $shouldBeRestarted); + } +} +``` + +In the case that the next time you restart the task, you should update the +headers, you can use the appropriate method by adding or changing the headers +of the received task. + +```php +$task + ->withHeader('attempts', (int)$task->getHeaderLine('attempts') - 1) + ->withHeader('retry-delay', (int)$task->getHeaderLine('retry-delay') * 2) + ->fail('Something went wrong', requeue: true) +; +``` + +In addition, you can re-specify the task execution delay. For example, in the +code above, you may have noticed the use of a custom header `"retry-delay"`, the +value of which doubled after each restart, so this value can be used to specify +the delay in the next task execution. + +```php +$task + ->withDelay((int)$task->getHeaderLine('retry-delay')) + ->fail('Something went wrong', true) +; +``` + +### Received Task ID + +Each task in the queue has a **unique** identifier. This allows you to +unambiguously identify the task among all existing tasks in all queues, no +matter what name it was received from. + +In addition, it is worth paying attention to the fact that the identifier is not +a sequential number that increases indefinitely. It means that there is still a +chance of an identifier collision, but it is about 1/2.71 quintillion. Even if +you send 1 billion tasks per second, it will take you about 85 years for an ID +collision to occur. + +```php +echo $task->getId(); +// Expected Result +// string(36) "88ca6810-eab9-473d-a8fd-4b4ae457b7dc" +``` + +In the case that you want to store this identifier in the database, it is +recommended to use a binary representation (16 bytes long if your DB requires +blob sizes). + +```php +$binary = hex2bin(str_replace('-', '', $task->getId())); +// Expected Result +// string(16) b"ˆÊh\x10ê¹G=¨ýKJäW·Ü" +``` + +### Received Task Queue + +Since a worker can process several different queues at once, you may need to +somehow determine from which queue the task came. To get the name of the queue, +use the `getQueue(): string` method. + +```php +echo $task->getQueue(); +// Expected +// string(13) "example-queue" +``` + +For example, you can select different task handlers based on different types of +queues. + +```php +// This is just an example of a handler +$handler = $container->get(match($task->getQueue()) { + 'emails' => 'email-handler', + 'billing' => 'billing-handler', + default => throw new InvalidArgumentException('Unprocessable queue [' . $task->getQueue() . ']') +}); + +$handler->process($task); +``` + +### Received Task Name + +The task name is some identifier associated with a specific type of task. For +example, it may contain the name of the task class so that in the future we can +create an object of this task by passing the required data there. To get the +name of the task, use the `getName(): string` method. + +```php +echo $task->getName(); +// Expected +// string(21) "App\\Queue\\Task\\EmailTask" +``` + +Thus, we can implement the creation of a specific task with certain data for +this task. + +```php +$class = $task->getName(); + +if (!class_exists($class)) { + throw new InvalidArgumentException("Unprocessable task [$class]"); +} + +$handler->process($class::fromTask($task)); +``` + +### Received Task Payload + +Each task contains a set of arbitrary user data to be processed within the task. +To obtain this data, you can use one of the available methods: + +**getValue** + +Method `getValue()` returns a specific payload value by key or `null` if no +value was passed. If you want to specify any other default value (for those +cases when the payload with the identifier was not passed), then use the second +argument, passing your own default value there. + +```php +if ($task->getName() !== SendEmailTask::class) { + throw new InvalidArgumentException('Does not look like a mail task'); +} + +echo $task->getValue('email'); // "[email protected]" +echo $task->getValue('username', 'Guest'); // "John" +``` + +**hasValue** + +To check the existence of any value in the payload, use the `hasValue()` method. +This method will return `true` if the value for the payload was passed and `false` +otherwise. + +```php +if (!$task->hasValue('email')) { + throw new InvalidArgumentException('The "email" value is required for this task'); +} + +$email->sendTo($task->getValue('email')); +``` + +**getPayload** + +Also you can get all data at once in `array(string|int $key => mixed $value)` +format using the `getPayload` method. This method may be useful to you in cases +of transferring all data to the DTO. + +```php +$class = $task->getName(); +$arguments = $task->getPayload(); + +$dto = new $class(...$arguments); +``` + +You should pay attention that an array can contain both `int` and `string` +keys, so you should take care of their correct pass to the constructor +yourself. For example, the code above will work completely correctly only in the +case of PHP >= 8.1. And in the case of earlier versions of the language, you +should use the [reflection functionality](https://www.php.net/manual/ru/reflectionclass.newinstanceargs.php), +or pass the payload in some other way. + +Since the handler process is not the one that put this task in the queue, then +if you send any object to the queue, it will be serialized and then automatically +unpacked in the handler. The default serializer suitable for most cases, so you +can even pass `Closure` instances. However, in the case of any specific data +types, you should manage their packing and unpacking yourself, either by +replacing the serializer completely, or for a separate value. In this case, do +not forget to specify this both on the client and consumer side. + +### Received Task Headers + +In the case that you need to get any additional information that is not related +to the task, then for this you should use the functionality of headers. + +For example, headers can convey information about the serializer, encoding, or +other metadata. + +```php +$message = $task->getValue('message'); +$encoding = $task->getHeaderLine('encoding'); + +if (strtolower($encoding) !== 'utf-8') { + $message = iconv($encoding, 'utf-8', $message); +} +``` + +The interface for receiving headers is completely similar to +[PSR-7](https://www.php-fig.org/psr/psr-7/), so methods are available to you: +- `getHeaders(): array<string, array<string, string>>` - Retrieves all task + header values. +- `hasHeader(string): bool` - Checks if a header exists by the given name. +- `getHeader(string): array<string, string>` - Retrieves a message header value + by the given name. +- `getHeaderLine(string): string` - Retrieves a comma-separated string of the + values for a single header by the given name. + +We got acquainted with the data and capabilities that we have in the consumer. +Let's now get down to the basics - sending these messages. + +## Advanced Functionality + +In addition to the main functionality of queues for sending and processing in +API has additional functionality that is not directly related to these tasks. +After we have examined the main functionality, it's time to disassemble the +advanced features. + +### Creating A New Queue + +In the very [first chapter](/beep-beep/jobs.md#configuration), we got acquainted +with the queue settings and drivers for them. In approximately the same way, we +can do almost the same thing with the help of the PHP code using `create()` +method through `Jobs` instance. + +To create a new queue, the following types of DTO are available to you: + +- `Spiral\RoadRunner\Jobs\Queue\AMQPCreateInfo` for AMQP queues. +- `Spiral\RoadRunner\Jobs\Queue\BeanstalkCreateInfo` for Beanstalk queues. +- `Spiral\RoadRunner\Jobs\Queue\MemoryCreateInfo` for in-memory queues. +- `Spiral\RoadRunner\Jobs\Queue\SQSCreateInfo` for SQS queues. + +Such a DTO with the appropriate settings should be passed to the `create()` +method to create the corresponding queue: + +```php +use Spiral\RoadRunner\Jobs\Jobs; +use Spiral\RoadRunner\Jobs\Queue\MemoryCreateInfo; + +$jobs = new Jobs(); + +// +// Create a new "example" in-memory queue +// +$queue = $jobs->create(new MemoryCreateInfo( + name: 'example', + priority: 42, + prefetch: 10, +)); +``` + +### Getting A List Of Queues + +In that case, to get a list of all available queues, you just need to use the +standard functionality of the `foreach` operator. Each element of this collection +will correspond to a specific queue registered in the RoadRunner. And to simply +get the number of all available queues, you can pass a `Job` object to the +`count()` function. + +```php +$jobs = new Spiral\RoadRunner\Jobs\Jobs(); + +foreach ($jobs as $queue) { + var_dump($queue->getName()); + // Expects name of the queue +} + +$count = count($jobs); +// Expects the number of a queues +``` + +### Pausing A Queue + +In addition to the ability to create new queues, there may be times when a queue +needs to be suspended for processing. Such cases can arise, for example, in the +case of deploying a new application, when the processing of tasks should be +suspended during the deployment of new application code. + +In this case, the code will be pretty simple. It is enough to call the `pause()` +method, passing the names of the queues there. In order to start the work of +queues further (unpause), you need to call a similar `resume()` method. + +```php +$jobs = new Spiral\RoadRunner\Jobs\Jobs(); + +// Pause "emails", "billing" and "backups" queues. +$jobs->pause('emails', 'billing', 'backups'); + +// Resuming only "emails" and "billing". +$jobs->resume('emails', 'billing'); +``` + +## RPC Interface + +All communication between PHP and GO made by the RPC calls with protobuf payloads. +You can find versioned proto-payloads here: [Proto](https://github.com/spiral/roadrunner/blob/e9713a1d08a93e2be70c889c600ed89f54822b54/proto/jobs/v1beta). + +- `Push(in *jobsv1beta.PushRequest, out *jobsv1beta.Empty) error` - The + arguments: the first argument is a `PushRequest`, which contains one field + of the `Job` being sent to the queue; the second argument is `Empty`, which + means that the function does not return a result (returns nothing). The error + returned if the request fails. + +- `PushBatch(in *jobsv1beta.PushBatchRequest, out *jobsv1beta.Empty) error` - + The arguments: the first argument is a `PushBatchRequest`, which contains one + repeated (list) field of the `Job` being sent to the queue; the second + argument is `Empty`, which means that the function does not return a result. + The error returned if the request fails. + +- `Pause(in *jobsv1beta.Pipelines, out *jobsv1beta.Empty) error` - The arguments: + the first argument is a `Pipelines`, which contains one repeated (list) + field with the `string` names of the queues to be paused; the second + argument is `Empty`, which means that the function does not return a result. + The error returned if the request fails. + +- `Resume(in *jobsv1beta.Pipelines, out *jobsv1beta.Empty) error` - The + arguments: the first argument is a `Pipelines`, which contains one + repeated (list) field with the `string` names of the queues to be resumed; the + second argument is `Empty`, which means that the function does not return a + result. The error returned if the request fails. + +- `List(in *jobsv1beta.Empty, out *jobsv1beta.Pipelines) error` - The + arguments: the first argument is an `Empty`, meaning that the function does + not accept anything (from the point of view of the PHP API, an empty string + should be passed); the second argument is `Pipelines`, which contains one + repeated (list) field with the `string` names of the all available queues. + The error returned if the request fails. + +- `Declare(in *jobsv1beta.DeclareRequest, out *jobsv1beta.Empty) error` - The + arguments: the first argument is an `DeclareRequest`, which contains one + `map<string, string>` pipeline field of queue configuration; the second + argument is `Empty`, which means that the function does not return a result. + The error returned if the request fails. + +- `Stat(in *jobsv1beta.Empty, out *jobsv1beta.Stats) error` - The arguments: + the first argument is an `Empty`, meaning that the function does not accept + anything (from the point of view of the PHP API, an empty string should be + passed); the second argument is `Stats`, which contains one repeated (list) + field named `Stats` of type `Stat`. The error returned if the request fails. + + +From the PHP point of view, such requests (`List` for example) are as follows: +```php +use Spiral\Goridge\RPC\RPC; +use Spiral\Goridge\RPC\Codec\ProtobufCodec; +use Spiral\RoadRunner\Jobs\DTO\V1\Maintenance; + +$response = RPC::create('tcp://127.0.0.1:6001') + ->withServicePrefix('jobs') + ->withCodec(new ProtobufCodec()) + ->call('List', '', Maintenance::class); +``` |