diff options
Diffstat (limited to 'docs/beep-beep/jobs.md')
-rw-r--r-- | docs/beep-beep/jobs.md | 1166 |
1 files changed, 0 insertions, 1166 deletions
diff --git a/docs/beep-beep/jobs.md b/docs/beep-beep/jobs.md deleted file mode 100644 index 3aa4e1c0..00000000 --- a/docs/beep-beep/jobs.md +++ /dev/null @@ -1,1166 +0,0 @@ -# Jobs - -Starting with RoadRunner >= 2.4, a queuing system (aka "jobs") is available. -This plugin allows you to move arbitrary "heavy" code into separate tasks to -execute them asynchronously in an external worker, which will be referred to -as "consumer" in this documentation. - -The RoadRunner PHP library provides both API implementations: The client one, -which allows you to dispatch tasks, and the server one, which provides the -consumer who processes the tasks. - -![queue](https://user-images.githubusercontent.com/2461257/128100380-2d4df71a-c86e-4d5d-a58e-a3d503349200.png) - -## Installation - -> **Requirements** -> - PHP >= 7.4 -> - RoadRunner >= 2.4 -> - *ext-protobuf (optional)* - -To get access from the PHP code, you should put the corresponding dependency -using [the Composer](https://getcomposer.org/). - -```sh -$ composer require spiral/roadrunner-jobs -``` - -## Configuration - -After installing all the required dependencies, you need to configure this -plugin. To enable it add `jobs` section to your configuration. - -For example, in this way, you can configure both the client and server parts to -work with RabbitMQ. - -```yaml -# -# RPC is required for tasks dispatching (client) -# -rpc: - listen: tcp://127.0.0.1:6001 - -# -# This section configures the task consumer (server) -# -server: - command: php consumer.php - relay: pipes - -# -# In this section, the jobs themselves are configured -# -jobs: - consume: [ "test" ] # List of RoadRunner queues that can be processed by - # the consumer specified in the "server" section. - pipelines: - test: # RoadRunner queue identifier - driver: memory # - Queue driver name - queue: test # - Internal (driver's) queue identifier -``` - -- The `rpc` section is responsible for client settings. It is at this address - that we will connect, *dispatching tasks* to the queue. - -- The `server` section is responsible for configuring the server. Previously, we - have already met with its description when setting up the [PHP Worker](/php/worker.md). - -- And finally, the `jobs` section is responsible for the work of the queues - themselves. It contains information on how the RoadRunner should work with - connections to drivers, what can be handled by the consumer, and other - queue-specific settings. - -### Common Configuration - -Let's now focus on the common settings of the queue server. In full, it may -look like this: - -```yaml -jobs: - num_pollers: 64 - timeout: 60 - pipeline_size: 100000 - pool: - num_workers: 10 - allocate_timeout: 60s - destroy_timeout: 60s - consume: [ "queue-name" ] - pipelines: - queue-name: - driver: # "[DRIVER_NAME]" - # And driver-specific configuration below... -``` - -Above is a complete list of all possible common Jobs settings. Let's now figure -out what they are responsible for. - -- `num_pollers` - The number of threads that concurrently read from the priority - queue and send payloads to the workers. There is no optimal number, it's - heavily dependent on the PHP worker's performance. For example, "echo workers" - may process over 300k jobs per second within 64 pollers (on 32 core CPU). - -- `timeout` - The internal timeouts via golang context (in seconds). For - example, if the connection was interrupted or your push in the middle of the - redial state with 10 minutes timeout (but our timeout is 1 min for example), - or queue is full. If the timeout exceeds, your call will be rejected with an - error. Default: 60 (seconds). - -- `pipeline_size` - The "binary heaps" priority queue (PQ) settings. Priority - queue stores jobs inside according to its' priorities. Priority might be set - for the job or inherited by the pipeline. If worker performance is poor, PQ - will accumulate jobs until `pipeline_size` will be reached. After that, PQ - will be blocked until workers process all the jobs inside. - - Blocked PQ means, that you can push the job into the driver, but RoadRunner - will not read that job until PQ will be empty. If RoadRunner will be killed - with jobs inside the PQ, they won't be lost, because jobs are deleted from the - drivers' queue only after Ack. - -- `pool` - All settings in this section are similar to the worker pool settings - described on the [configuration page](https://roadrunner.dev/docs/intro-config). - -- `consume` - Contains an array of the names of all queues specified in the - `"pipelines"` section, which should be processed by the concierge specified in - the global `"server"` section (see the [PHP worker's settings](/php/worker.md)). - -- `pipelines` - This section contains a list of all queues declared in the - RoadRunner. The key is a unique *queue identifier*, and the value is an object - from the settings specific to each driver (we will talk about it later). - -### Memory Driver - -This type of driver is already supported by the RoadRunner and does not require -any additional installations. - -Note that using this type of queue driver, all data is in memory and will be -destroyed when the RoadRunner Server is restarted. If you need persistent -queue, then it is recommended to use alternative drivers: `amqp`, `beanstalk` -or `sqs`. - -The complete `memory` driver configuration looks like this: - -```yaml -jobs: - pipelines: - # User defined name of the queue. - example: - # Required section. - # Should be "memory" for the in-memory driver. - driver: memory - - # Optional section. - # Default: 10 - priority: 10 - - # Optional section. - # Default: 10 - prefetch: 10 -``` - -Below is a more detailed description of each of the in-memory-specific options: -- `priority` - Queue default priority for each task pushed into this queue - if the priority value for these tasks was not explicitly set. - -- `prefetch` - A local buffer between the PQ (priority queue) and driver. If the - PQ size is set to 100 and prefetch to 100000, you'll be able to push up to - prefetch number of jobs even if PQ is full. - -> Please note that this driver cannot hold more than 1000 tasks with delay at -> the same time (RR limitation) - -### Local (based on the boltdb) Driver - -This type of driver is already supported by the RoadRunner and does not require -any additional installations. It uses boltdb as its main storage for the jobs. This driver should be used locally, for -testing or developing purposes. It can be used in the production, but this type of driver can't handle -huge load. Maximum RPS it can have no more than 30-50. - -Data in this driver persists in the boltdb database file. You can't open same file simultaneously for the 2 pipelines or -for the KV plugin and Jobs plugin. This is boltdb limitation on concurrent access from the 2 processes to the same file. - -The complete `boltdb` driver configuration looks like this: - -```yaml - -boltdb: - permissions: 0777 - -jobs: - pipelines: - # User defined name of the queue. - example: - # Required section. - # Should be "boltdb" for the local driver. - driver: boltdb - - # Optional section. - # Default: 10 - priority: 10 - - # Optional section. - # Default: 1000 - prefetch: 1000 -``` - -Below is a more detailed description of each of the in-memory-specific options: -- `priority` - Queue default priority for each task pushed into this queue - if the priority value for these tasks was not explicitly set. - -- `prefetch` - A local buffer between the PQ (priority queue) and driver. If the - PQ size is set to 100 and prefetch to 100000, you'll be able to push up to - prefetch number of jobs even if PQ is full. - -- `file` - boltdb database file to use. Might be a path with file: `foo/bar/rr1.db`. Default: `rr.db`. - - -### AMQP Driver - -Strictly speaking, AMQP (and 0.9.1 version used) is a protocol, not a full-fledged driver, so you can use -any servers that support this protocol (on your own, only rabbitmq was tested) , such as: -[RabbitMQ](https://www.rabbitmq.com/), [Apache Qpid](http://qpid.apache.org/) or -[Apache ActiveMQ](http://activemq.apache.org/). However, it is recommended to -use RabbitMQ as the main implementation, and reliable performance with other -implementations is not guaranteed. - -To install and configure the RabbitMQ, use the corresponding -[documentation page](https://www.rabbitmq.com/download.html). After that, you -should configure the connection to the server in the "`amqp`" section. This -configuration section contains exactly one `addr` key with a -[connection DSN](https://www.rabbitmq.com/uri-spec.html). - -```yaml -amqp: - addr: amqp://guest:guest@localhost:5672 -``` - -After creating a connection to the server, you can create a new queue that will -use this connection and which will contain the queue settings (including -amqp-specific): - -```yaml -amqp: - addr: amqp://guest:guest@localhost:5672 - - -jobs: - pipelines: - # User defined name of the queue. - example: - # Required section. - # Should be "amqp" for the AMQP driver. - driver: amqp - - # Optional section. - # Default: 10 - priority: 10 - - # Optional section. - # Default: 100 - prefetch: 100 - - # Optional section. - # Default: "default" - queue: "default" - - # Optional section. - # Default: "amqp.default" - exchange: "amqp.default" - - # Optional section. - # Default: "direct" - exchange_type: "direct" - - # Optional section. - # Default: "" (empty) - routing_key: "" - - # Optional section. - # Default: false - exclusive: false - - # Optional section. - # Default: false - multiple_ack: false - - # Optional section. - # Default: false - requeue_on_fail: false -``` - -Below is a more detailed description of each of the amqp-specific options: -- `priority` - Queue default priority for for each task pushed into this queue - if the priority value for these tasks was not explicitly set. - -- `prefetch` - The client can request that messages be sent in advance so that - when the client finishes processing a message, the following message is - already held locally, rather than needing to be sent down the channel. - Prefetching gives a performance improvement. This field specifies the prefetch - window size in octets. See also ["prefetch-size"](https://www.rabbitmq.com/amqp-0-9-1-reference.html) - in AMQP QoS documentation reference. - -- `queue` - AMQP internal (inside the driver) queue name. - -- `exchange` - The name of AMQP exchange to which tasks are sent. Exchange - distributes the tasks to one or more queues. It routes tasks to the queue - based on the created bindings between it and the queue. See also - ["AMQP model"](https://www.rabbitmq.com/tutorials/amqp-concepts.html#amqp-model) - documentation section. - -- `exchange_type` - The type of task delivery. May be one of `direct`, `topics`, - `headers` or `fanout`. - - `direct` - Used when a task needs to be delivered to specific queues. The - task is published to an exchanger with a specific routing key and goes to - all queues that are associated with this exchanger with a similar routing - key. - - `topics` - Similarly, `direct` exchange enables selective routing by - comparing the routing key. But, in this case, the key is set using a - template, like: `user.*.messages`. - - `fanout` - All tasks are delivered to all queues even if a routing key is - specified in the task. - - `headers` - Routes tasks to related queues based on a comparison of the - (key, value) pairs of the headers property of the binding and the similar - property of the message. - - - `routing_key` - Queue's routing key. - - - `exclusive` - Exclusive queues can't be redeclared. If set to true and - you'll try to declare the same pipeline twice, that will lead to an error. - - - `multiple_ack` - This delivery and all prior unacknowledged deliveries on - the same channel will be acknowledged. This is useful for batch processing - of deliveries. Applicable only for the Ack, not for the Nack. - - - `requeue_on_fail` - Requeue on Nack. - -### Beanstalk Driver - -Beanstalk is a simple and fast general purpose work queue. To install Beanstalk, -you can use the [local queue server](https://github.com/beanstalkd/beanstalkd) -or run the server inside [AWS Elastic](https://aws.amazon.com/elasticbeanstalk/). -You can choose any option that is convenient for you. - -Setting up the server is similar to setting up AMQP and requires specifying the -connection in the `"beanstalk"` section of your RoadRunner configuration file. - -```yaml -beanstalk: - addr: tcp://127.0.0.1:11300 -``` - -After setting up the connection, you can start using it. Let's take a look at -the complete config with all the options for this driver: - -```yaml -beanstalk: - # Optional section. - # Default: tcp://127.0.0.1:11300 - addr: tcp://127.0.0.1:11300 - - # Optional section. - # Default: 30s - timeout: 10s - -jobs: - pipelines: - # User defined name of the queue. - example: - # Required section. - # Should be "beanstalk" for the Beanstalk driver. - driver: beanstalk - - # Optional section. - # Default: 10 - priority: 10 - - # Optional section. - # Default: 1 - tube_priority: 1 - - # Optional section. - # Default: default - tube: default - - # Optional section. - # Default: 5s - reserve_timeout: 5s -``` - -These are all settings that are available to you for configuring this type of -driver. Let's take a look at what they are responsible for: -- `priority` - Similar to the same option in other drivers. This is queue - default priority for for each task pushed into this queue if the priority - value for these tasks was not explicitly set. - -- `tube_priority` - The value for specifying the priority within Beanstalk is - the internal priority of the server. The value should not exceed `int32` size. - -- `tube` - The name of the inner "tube" specific to the Beanstalk driver. - -### SQS Driver - -[Amazon SQS (Simple Queue Service)](https://aws.amazon.com/sqs/) is an -alternative queue server also developed by Amazon and is also part of the AWS -service infrastructure. If you prefer to use the "cloud" option, then you can -use the [ready-made documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configuring.html) -for its installation. - -In addition to the possibility of using this queue server within the AWS, you -can also use the local installation of this system on your own servers. If you -prefer this option, then you can use [softwaremill's implementation](https://github.com/softwaremill/elasticmq) -of the Amazon SQS server. - -After you have created the SQS server, you need to specify the following -connection settings in `sqs` configuration settings. Unlike AMQP and Beanstalk, -SQS requires more values to set up a connection and will be different from what -we're used to: - -```yaml -sqs: - # Required AccessKey ID. - # Default: empty - key: access-key - - # Required secret access key. - # Default: empty - secret: api-secret - - # Required AWS region. - # Default: empty - region: us-west-1 - - # Required AWS session token. - # Default: empty - session_token: test - - # Required AWS SQS endpoint to connect. - # Default: http://127.0.0.1:9324 - endpoint: http://127.0.0.1:9324 -``` - -> Please note that although each of the sections contains default values, it is -> marked as "required". This means that in almost all cases they are required to -> be specified in order to correctly configure the driver. - -After you have configured the connection - you should configure the queue that -will use this connection: - -```yaml -sqs: - # SQS connection configuration... - -jobs: - pipelines: - # Required section. - # Should be "sqs" for the Amazon SQS driver. - driver: sqs - - # Optional section. - # Default: 10 - prefetch: 10 - - # Optional section. - # Default: 0 - visibility_timeout: 0 - - # Optional section. - # Default: 0 - wait_time_seconds: 0 - - # Optional section. - # Default: default - queue: default - - # Optional section. - # Default: empty - attributes: - DelaySeconds: 42 - # etc... see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html - - # Optional section. - # Default: empty - tags: - test: "tag" -``` - -Below is a more detailed description of each of the SQS-specific options: -- `prefetch` - Number of jobs to prefetch from the SQS. Amazon SQS never returns - more messages than this value (however, fewer messages might be returned). - Valid values: 1 to 10. Any number bigger than 10 will be rounded to 10. - Default: `10`. - -- `visibility_timeout` - The duration (in seconds) that the received messages - are hidden from subsequent retrieve requests after being retrieved by a - ReceiveMessage request. Max value is 43200 seconds (12 hours). Default: `0`. - -- `wait_time_seconds` - The duration (in seconds) for which the call waits for - a message to arrive in the queue before returning. If a message is available, - the call returns sooner than WaitTimeSeconds. If no messages are available and - the wait time expires, the call returns successfully with an empty list of - messages. Default: `5`. - -- `queue` - SQS internal queue name. Can contain alphanumeric characters, - hyphens (-), and underscores (_). Default value is `"default"` string. - -- `attributes` - List of the [AWS SQS attributes](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html). -> For example -> ```yaml -> attributes: -> DelaySeconds: 0 -> MaximumMessageSize: 262144 -> MessageRetentionPeriod: 345600 -> ReceiveMessageWaitTimeSeconds: 0 -> VisibilityTimeout: 30 -> ``` - -- `tags` - Tags don't have any semantic meaning. Amazon SQS interprets tags as - character. -> Please note that this functionality is rarely used and slows down the work of -> queues: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html - -## Client (Producer) - -Now that we have configured the server, we can start writing our first code for -sending the task to the queue. But before doing this, we need to connect to our -server. And to do this, it is enough to create a `Jobs` instance. - -```php -// Server Connection -$jobs = new Spiral\RoadRunner\Jobs\Jobs(); -``` - -Please note that in this case we have not specified any connection settings. And -this is really not required if this code is executed in a RoadRunner environment. -However, in the case that a connection is required to be established -from a third-party application (for example, a CLI command), then the settings -must be specified explicitly. - -```php -$jobs = new Spiral\RoadRunner\Jobs\Jobs( - // Expects RPC connection - Spiral\Goridge\RPC\RPC::create('tcp://127.0.0.1:6001') -); -``` - -After we have established the connection, we should check the server -availability and in this case the API availability for the jobs. This can be -done using the appropriate `isAvailable()` method. When the connection is -created, and the availability of the functionality is checked, we can connect to -the queue we need using `connect()` method. - -```php -$jobs = new Spiral\RoadRunner\Jobs\Jobs(); - -if (!$jobs->isAvailable()) { - throw new LogicException('The server does not support "jobs" functionality =('); -} - -$queue = $jobs->connect('queue-name'); -``` - -### Task Creation - -Before submitting a task to the queue, you should create this task. To create a -task, it is enough to call the corresponding `create()` method. - -```php -$task = $queue->create(SendEmailTask::class); -// Expected: -// object(Spiral\RoadRunner\Jobs\Task\PreparedTaskInterface) -``` - -> Note that the name of the task does not have to be a class. Here we are using -> `SendEmailTask` just for convenience. - -Also, this method takes an additional second argument with additional data to -complete this task. - -```php -$task = $queue->create(SendEmailTask::class, ['email' => '[email protected]']); -``` - -You can also use this task as a basis for creating several others. - -```php -$task = $queue->create(SendEmailTask::class); - -$first = $task->withValue('[email protected]'); -$second = $task->withValue('[email protected]'); -``` - -### Task Dispatching - -And to send tasks to the queue, we can use different methods: -`dispatch()` and `dispatchMany()`. The difference between these two -implementations is that the first one sends a task to the queue, returning a -dispatched task object, while the second one dispatches multiple tasks, -returning an array. Moreover, the second method provides one-time delivery of -all tasks in the array, as opposed to sending each task separately. - -```php -$a = $queue->create(SendEmailTask::class, ['email' => '[email protected]']); -$b = $queue->create(SendEmailTask::class, ['email' => '[email protected]']); - -foreach ([$a, $b] as $task) { - $result = $queue->dispatch($task); - // Expected: - // object(Spiral\RoadRunner\Jobs\Task\QueuedTaskInterface) -} - -// Using a batching send -$result = $queue->dispatchMany($a, $b); -// Expected: -// array(2) { -// object(Spiral\RoadRunner\Jobs\Task\QueuedTaskInterface), -// object(Spiral\RoadRunner\Jobs\Task\QueuedTaskInterface) -// } -``` - -### Task Immediately Dispatching - -In the case that you do not want to create a new task and then immediately -dispatch it, you can simplify the work by using the `push` method. However, this -functionality has a number of limitations. In case of creating a new task: -- You can flexibly configure additional task capabilities using a convenient - fluent interface. -- You can prepare a common task for several others and use it as a basis to - create several alternative tasks. -- You can create several different tasks and collect them into one collection - and send them to the queue at once (using the so-called batching). - -In the case of immediate dispatch, you will have access to only the basic -features: The `push()` method accepts one required argument with the -name of the task and two optional arguments containing additional data for the -task being performed and additional sending options (for example, a delay). -Moreover, this method is designed to send only one task. - -```php -use Spiral\RoadRunner\Jobs\Options; - -$payload = ['email' => $email, 'message' => $message]; - -$task = $queue->push(SendEmailTask::class, $payload, new Options( - delay: 60 // in seconds -)); -``` - -### Task Payload - -As you can see, each task, in addition to the name, can contain additional data -(payload) specific to a certain type of task. You yourself can determine what -data should be transferred to the task and no special requirements are imposed -on them, except for the main ones: Since this task is then sent to the queue, -they must be serializable. - -> The default serializer used in jobs allows you to pass anonymous functions -> as well. - -In case to add additional data, you can use the optional second argument -provided by the `create()` and `push()` methods, or you can use the fluent -interface to supplement or modify the task data. Everything is quite simple -here; you can add data using the `withValue()` method, or delete them using the -`withoutValue()` method. - -The first argument of the `withValue()` method passes a payload value as the -required first argument. If you also need to specify a key for it, just pass it -as an optional second argument. - -```php -$task = $queue->create(CreateBackup::class) - ->withValue('/var/www') - ->withValue(42, 'answer') - ->withValue('/dev/null', 'output'); - -// An example like this will be completely equivalent to if we passed -// all this data at one time -$task = $queue->create(CreateBackup::class, [ - '/var/www', - 'answer' => 42, - 'output' => '/dev/null' -]); - -// On the other hand, we don't need an "answer"... -$task = $task->withoutValue('answer'); -``` - -### Task Headers - -In addition to the data itself, we can send additional metadata that is not -related to the payload of the task, that is, headers. In them, we can pass -any additional information, for example: Encoding of messages, their format, -the server's IP address, the user's token or session id, etc. - -Headers can only contain string values and are not serialized in any way during -transmission, so be careful when specifying them. - -In the case to add a new header to the task, you can use methods -[similar to PSR-7](https://www.php-fig.org/psr/psr-7/). That is: -- `withHeader(string, iterable<string>|string): self` - Return an instance with - the provided value replacing the specified header. -- `withAddedHeader(string, iterable<string>|string): self` - Return an instance - with the specified header appended with the given value. -- `withoutHeader(string): self` - Return an instance without the specified header. - -```php -$task = $queue->create(RestartServer::class) - ->withValue('addr', '127.0.0.1') - ->withAddedHeader('access-token', 'IDDQD'); - -$queue->dispatch($task); -``` - -### Task Delayed Dispatching - -If you want to specify that a job should not be immediately available for -processing by a jobs worker, you can use the delayed job option. -For example, let's specify that a job shouldn't be available for processing -until 42 minutes after it has been dispatched: - -```php -$task = $queue->create(SendEmailTask::class) - ->withDelay(42 * 60); // 42 min * 60 sec -``` - -## Consumer Usage - -You probably already noticed that when [setting up a jobs consumer](#configuration), -the `"server"` configuration section is used in which a PHP file-handler is defined. -Exactly the same one we used earlier to write a [HTTP Worker](/php/worker.md). -Does this mean that if we want to use the Jobs Worker, then we can no longer -use the HTTP Worker? No it is not! - -During the launch of the RoadRunner, it spawns several workers defined in the -`"server"` config section (by default, the number of workers is equal to the -number of CPU cores). At the same time, during the spawn of the workers, it -transmits in advance to each of them information about the *mode* in which this -worker will be used. The information about the *mode* itself is contained in the -environment variable `RR_ENV` and for the HTTP worker the value will correspond -to the `"http"`, and for the Jobs worker the value of `"jobs"` will be stored -there. - -![queue-mode](https://user-images.githubusercontent.com/2461257/128106755-cb0d3cb7-3f98-433e-a1c7-1ed92839376a.png) - -There are several ways to check the operating mode from the code: -- By getting the value of the env variable. -- Or using the appropriate API method (from the `spiral/roadrunner-worker` package). - -The second choice may be more preferable in cases where you need to change the -RoadRunner's mode, for example, in tests. - -```php -use Spiral\RoadRunner\Environment; -use Spiral\RoadRunner\Environment\Mode; - -// 1. Using global env variable -$isJobsMode = $_SERVER['RR_MODE'] === 'jobs'; - -// 2. Using RoadRunner's API -$env = Environment::fromGlobals(); - -$isJobsMode = $env->getMode() === Mode::MODE_JOBS; -``` - -After we are convinced of the specialization of the worker, we can write the -corresponding code for processing tasks. To get information about the available -task in the worker, use the -`$consumer->waitTask(): ReceivedTaskInterface` method. - -```php -use Spiral\RoadRunner\Jobs\Consumer; -use Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface; - - -$consumer = new Consumer(); - -/** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */ -while ($task = $consumer->waitTask()) { - var_dump($task); -} -``` - -After you receive the task from the queue, you can start processing it in -accordance with the requirements. Don't worry about how much memory or time this -execution takes - the RoadRunner takes over the tasks of managing and -distributing tasks among the workers. - -After you have processed the incoming task, you can execute the -`complete(): void` method. After that, you tell the RoadRunner that you are -ready to handle the next task. - -```php -$consumer = new Spiral\RoadRunner\Jobs\Consumer(); - -while ($task = $consumer->waitTask()) { - - // - // Task handler code - // - - $task->complete(); -} -``` - -We got acquainted with the possibilities of receiving and processing tasks, but -we do not yet know what the received task is. Let's see what data it contains. - -### Task Failing - -In some cases, an error may occur during task processing. In this case, you -should use the `fail()` method, informing the RoadRunner about it. The method -takes two arguments. The first argument is required and expects any string or -string-like (instance of Stringable, for example any exception) value with an -error message. The second is optional and tells the server to restart this task. - -```php -$consumer = new Spiral\RoadRunner\Jobs\Consumer(); -$shouldBeRestarted = false; - -while ($task = $consumer->waitTask()) { - try { - // - // Do something... - // - $task->complete(); - } catch (\Throwable $e) { - $task->fail($e, $shouldBeRestarted); - } -} -``` - -In the case that the next time you restart the task, you should update the -headers, you can use the appropriate method by adding or changing the headers -of the received task. - -```php -$task - ->withHeader('attempts', (int)$task->getHeaderLine('attempts') - 1) - ->withHeader('retry-delay', (int)$task->getHeaderLine('retry-delay') * 2) - ->fail('Something went wrong', requeue: true) -; -``` - -In addition, you can re-specify the task execution delay. For example, in the -code above, you may have noticed the use of a custom header `"retry-delay"`, the -value of which doubled after each restart, so this value can be used to specify -the delay in the next task execution. - -```php -$task - ->withDelay((int)$task->getHeaderLine('retry-delay')) - ->fail('Something went wrong', true) -; -``` - -### Received Task ID - -Each task in the queue has a **unique** identifier. This allows you to -unambiguously identify the task among all existing tasks in all queues, no -matter what name it was received from. - -In addition, it is worth paying attention to the fact that the identifier is not -a sequential number that increases indefinitely. It means that there is still a -chance of an identifier collision, but it is about 1/2.71 quintillion. Even if -you send 1 billion tasks per second, it will take you about 85 years for an ID -collision to occur. - -```php -echo $task->getId(); -// Expected Result -// string(36) "88ca6810-eab9-473d-a8fd-4b4ae457b7dc" -``` - -In the case that you want to store this identifier in the database, it is -recommended to use a binary representation (16 bytes long if your DB requires -blob sizes). - -```php -$binary = hex2bin(str_replace('-', '', $task->getId())); -// Expected Result -// string(16) b"ˆÊh\x10ê¹G=¨ýKJäW·Ü" -``` - -### Received Task Queue - -Since a worker can process several different queues at once, you may need to -somehow determine from which queue the task came. To get the name of the queue, -use the `getQueue(): string` method. - -```php -echo $task->getQueue(); -// Expected -// string(13) "example-queue" -``` - -For example, you can select different task handlers based on different types of -queues. - -```php -// This is just an example of a handler -$handler = $container->get(match($task->getQueue()) { - 'emails' => 'email-handler', - 'billing' => 'billing-handler', - default => throw new InvalidArgumentException('Unprocessable queue [' . $task->getQueue() . ']') -}); - -$handler->process($task); -``` - -### Received Task Name - -The task name is some identifier associated with a specific type of task. For -example, it may contain the name of the task class so that in the future we can -create an object of this task by passing the required data there. To get the -name of the task, use the `getName(): string` method. - -```php -echo $task->getName(); -// Expected -// string(21) "App\\Queue\\Task\\EmailTask" -``` - -Thus, we can implement the creation of a specific task with certain data for -this task. - -```php -$class = $task->getName(); - -if (!class_exists($class)) { - throw new InvalidArgumentException("Unprocessable task [$class]"); -} - -$handler->process($class::fromTask($task)); -``` - -### Received Task Payload - -Each task contains a set of arbitrary user data to be processed within the task. -To obtain this data, you can use one of the available methods: - -**getValue** - -Method `getValue()` returns a specific payload value by key or `null` if no -value was passed. If you want to specify any other default value (for those -cases when the payload with the identifier was not passed), then use the second -argument, passing your own default value there. - -```php -if ($task->getName() !== SendEmailTask::class) { - throw new InvalidArgumentException('Does not look like a mail task'); -} - -echo $task->getValue('email'); // "[email protected]" -echo $task->getValue('username', 'Guest'); // "John" -``` - -**hasValue** - -To check the existence of any value in the payload, use the `hasValue()` method. -This method will return `true` if the value for the payload was passed and `false` -otherwise. - -```php -if (!$task->hasValue('email')) { - throw new InvalidArgumentException('The "email" value is required for this task'); -} - -$email->sendTo($task->getValue('email')); -``` - -**getPayload** - -Also you can get all data at once in `array(string|int $key => mixed $value)` -format using the `getPayload` method. This method may be useful to you in cases -of transferring all data to the DTO. - -```php -$class = $task->getName(); -$arguments = $task->getPayload(); - -$dto = new $class(...$arguments); -``` - -You should pay attention that an array can contain both `int` and `string` -keys, so you should take care of their correct pass to the constructor -yourself. For example, the code above will work completely correctly only in the -case of PHP >= 8.1. And in the case of earlier versions of the language, you -should use the [reflection functionality](https://www.php.net/manual/ru/reflectionclass.newinstanceargs.php), -or pass the payload in some other way. - -Since the handler process is not the one that put this task in the queue, then -if you send any object to the queue, it will be serialized and then automatically -unpacked in the handler. The default serializer suitable for most cases, so you -can even pass `Closure` instances. However, in the case of any specific data -types, you should manage their packing and unpacking yourself, either by -replacing the serializer completely, or for a separate value. In this case, do -not forget to specify this both on the client and consumer side. - -### Received Task Headers - -In the case that you need to get any additional information that is not related -to the task, then for this you should use the functionality of headers. - -For example, headers can convey information about the serializer, encoding, or -other metadata. - -```php -$message = $task->getValue('message'); -$encoding = $task->getHeaderLine('encoding'); - -if (strtolower($encoding) !== 'utf-8') { - $message = iconv($encoding, 'utf-8', $message); -} -``` - -The interface for receiving headers is completely similar to -[PSR-7](https://www.php-fig.org/psr/psr-7/), so methods are available to you: -- `getHeaders(): array<string, array<string, string>>` - Retrieves all task - header values. -- `hasHeader(string): bool` - Checks if a header exists by the given name. -- `getHeader(string): array<string, string>` - Retrieves a message header value - by the given name. -- `getHeaderLine(string): string` - Retrieves a comma-separated string of the - values for a single header by the given name. - -We got acquainted with the data and capabilities that we have in the consumer. -Let's now get down to the basics - sending these messages. - -## Advanced Functionality - -In addition to the main functionality of queues for sending and processing in -API has additional functionality that is not directly related to these tasks. -After we have examined the main functionality, it's time to disassemble the -advanced features. - -### Creating A New Queue - -In the very [first chapter](/beep-beep/jobs.md#configuration), we got acquainted -with the queue settings and drivers for them. In approximately the same way, we -can do almost the same thing with the help of the PHP code using `create()` -method through `Jobs` instance. - -To create a new queue, the following types of DTO are available to you: - -- `Spiral\RoadRunner\Jobs\Queue\AMQPCreateInfo` for AMQP queues. -- `Spiral\RoadRunner\Jobs\Queue\BeanstalkCreateInfo` for Beanstalk queues. -- `Spiral\RoadRunner\Jobs\Queue\MemoryCreateInfo` for in-memory queues. -- `Spiral\RoadRunner\Jobs\Queue\SQSCreateInfo` for SQS queues. - -Such a DTO with the appropriate settings should be passed to the `create()` -method to create the corresponding queue: - -```php -use Spiral\RoadRunner\Jobs\Jobs; -use Spiral\RoadRunner\Jobs\Queue\MemoryCreateInfo; - -$jobs = new Jobs(); - -// -// Create a new "example" in-memory queue -// -$queue = $jobs->create(new MemoryCreateInfo( - name: 'example', - priority: 42, - prefetch: 10, -)); -``` - -### Getting A List Of Queues - -In that case, to get a list of all available queues, you just need to use the -standard functionality of the `foreach` operator. Each element of this collection -will correspond to a specific queue registered in the RoadRunner. And to simply -get the number of all available queues, you can pass a `Job` object to the -`count()` function. - -```php -$jobs = new Spiral\RoadRunner\Jobs\Jobs(); - -foreach ($jobs as $queue) { - var_dump($queue->getName()); - // Expects name of the queue -} - -$count = count($jobs); -// Expects the number of a queues -``` - -### Pausing A Queue - -In addition to the ability to create new queues, there may be times when a queue -needs to be suspended for processing. Such cases can arise, for example, in the -case of deploying a new application, when the processing of tasks should be -suspended during the deployment of new application code. - -In this case, the code will be pretty simple. It is enough to call the `pause()` -method, passing the names of the queues there. In order to start the work of -queues further (unpause), you need to call a similar `resume()` method. - -```php -$jobs = new Spiral\RoadRunner\Jobs\Jobs(); - -// Pause "emails", "billing" and "backups" queues. -$jobs->pause('emails', 'billing', 'backups'); - -// Resuming only "emails" and "billing". -$jobs->resume('emails', 'billing'); -``` - -## RPC Interface - -All communication between PHP and GO made by the RPC calls with protobuf payloads. -You can find versioned proto-payloads here: [Proto](https://github.com/spiral/roadrunner/blob/e9713a1d08a93e2be70c889c600ed89f54822b54/proto/jobs/v1beta). - -- `Push(in *jobsv1beta.PushRequest, out *jobsv1beta.Empty) error` - The - arguments: the first argument is a `PushRequest`, which contains one field - of the `Job` being sent to the queue; the second argument is `Empty`, which - means that the function does not return a result (returns nothing). The error - returned if the request fails. - -- `PushBatch(in *jobsv1beta.PushBatchRequest, out *jobsv1beta.Empty) error` - - The arguments: the first argument is a `PushBatchRequest`, which contains one - repeated (list) field of the `Job` being sent to the queue; the second - argument is `Empty`, which means that the function does not return a result. - The error returned if the request fails. - -- `Pause(in *jobsv1beta.Pipelines, out *jobsv1beta.Empty) error` - The arguments: - the first argument is a `Pipelines`, which contains one repeated (list) - field with the `string` names of the queues to be paused; the second - argument is `Empty`, which means that the function does not return a result. - The error returned if the request fails. - -- `Resume(in *jobsv1beta.Pipelines, out *jobsv1beta.Empty) error` - The - arguments: the first argument is a `Pipelines`, which contains one - repeated (list) field with the `string` names of the queues to be resumed; the - second argument is `Empty`, which means that the function does not return a - result. The error returned if the request fails. - -- `List(in *jobsv1beta.Empty, out *jobsv1beta.Pipelines) error` - The - arguments: the first argument is an `Empty`, meaning that the function does - not accept anything (from the point of view of the PHP API, an empty string - should be passed); the second argument is `Pipelines`, which contains one - repeated (list) field with the `string` names of the all available queues. - The error returned if the request fails. - -- `Declare(in *jobsv1beta.DeclareRequest, out *jobsv1beta.Empty) error` - The - arguments: the first argument is an `DeclareRequest`, which contains one - `map<string, string>` pipeline field of queue configuration; the second - argument is `Empty`, which means that the function does not return a result. - The error returned if the request fails. - -- `Stat(in *jobsv1beta.Empty, out *jobsv1beta.Stats) error` - The arguments: - the first argument is an `Empty`, meaning that the function does not accept - anything (from the point of view of the PHP API, an empty string should be - passed); the second argument is `Stats`, which contains one repeated (list) - field named `Stats` of type `Stat`. The error returned if the request fails. - - -From the PHP point of view, such requests (`List` for example) are as follows: -```php -use Spiral\Goridge\RPC\RPC; -use Spiral\Goridge\RPC\Codec\ProtobufCodec; -use Spiral\RoadRunner\Jobs\DTO\V1\Maintenance; - -$response = RPC::create('tcp://127.0.0.1:6001') - ->withServicePrefix('jobs') - ->withCodec(new ProtobufCodec()) - ->call('List', '', Maintenance::class); -``` |