summaryrefslogtreecommitdiff
path: root/docs/beep-beep/jobs.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/beep-beep/jobs.md')
-rw-r--r--docs/beep-beep/jobs.md1166
1 files changed, 1166 insertions, 0 deletions
diff --git a/docs/beep-beep/jobs.md b/docs/beep-beep/jobs.md
new file mode 100644
index 00000000..3aa4e1c0
--- /dev/null
+++ b/docs/beep-beep/jobs.md
@@ -0,0 +1,1166 @@
+# Jobs
+
+Starting with RoadRunner >= 2.4, a queuing system (aka "jobs") is available.
+This plugin allows you to move arbitrary "heavy" code into separate tasks to
+execute them asynchronously in an external worker, which will be referred to
+as "consumer" in this documentation.
+
+The RoadRunner PHP library provides both API implementations: The client one,
+which allows you to dispatch tasks, and the server one, which provides the
+consumer who processes the tasks.
+
+![queue](https://user-images.githubusercontent.com/2461257/128100380-2d4df71a-c86e-4d5d-a58e-a3d503349200.png)
+
+## Installation
+
+> **Requirements**
+> - PHP >= 7.4
+> - RoadRunner >= 2.4
+> - *ext-protobuf (optional)*
+
+To get access from the PHP code, you should put the corresponding dependency
+using [the Composer](https://getcomposer.org/).
+
+```sh
+$ composer require spiral/roadrunner-jobs
+```
+
+## Configuration
+
+After installing all the required dependencies, you need to configure this
+plugin. To enable it add `jobs` section to your configuration.
+
+For example, in this way, you can configure both the client and server parts to
+work with RabbitMQ.
+
+```yaml
+#
+# RPC is required for tasks dispatching (client)
+#
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+#
+# This section configures the task consumer (server)
+#
+server:
+ command: php consumer.php
+ relay: pipes
+
+#
+# In this section, the jobs themselves are configured
+#
+jobs:
+ consume: [ "test" ] # List of RoadRunner queues that can be processed by
+ # the consumer specified in the "server" section.
+ pipelines:
+ test: # RoadRunner queue identifier
+ driver: memory # - Queue driver name
+ queue: test # - Internal (driver's) queue identifier
+```
+
+- The `rpc` section is responsible for client settings. It is at this address
+ that we will connect, *dispatching tasks* to the queue.
+
+- The `server` section is responsible for configuring the server. Previously, we
+ have already met with its description when setting up the [PHP Worker](/php/worker.md).
+
+- And finally, the `jobs` section is responsible for the work of the queues
+ themselves. It contains information on how the RoadRunner should work with
+ connections to drivers, what can be handled by the consumer, and other
+ queue-specific settings.
+
+### Common Configuration
+
+Let's now focus on the common settings of the queue server. In full, it may
+look like this:
+
+```yaml
+jobs:
+ num_pollers: 64
+ timeout: 60
+ pipeline_size: 100000
+ pool:
+ num_workers: 10
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+ consume: [ "queue-name" ]
+ pipelines:
+ queue-name:
+ driver: # "[DRIVER_NAME]"
+ # And driver-specific configuration below...
+```
+
+Above is a complete list of all possible common Jobs settings. Let's now figure
+out what they are responsible for.
+
+- `num_pollers` - The number of threads that concurrently read from the priority
+ queue and send payloads to the workers. There is no optimal number, it's
+ heavily dependent on the PHP worker's performance. For example, "echo workers"
+ may process over 300k jobs per second within 64 pollers (on 32 core CPU).
+
+- `timeout` - The internal timeouts via golang context (in seconds). For
+ example, if the connection was interrupted or your push in the middle of the
+ redial state with 10 minutes timeout (but our timeout is 1 min for example),
+ or queue is full. If the timeout exceeds, your call will be rejected with an
+ error. Default: 60 (seconds).
+
+- `pipeline_size` - The "binary heaps" priority queue (PQ) settings. Priority
+ queue stores jobs inside according to its' priorities. Priority might be set
+ for the job or inherited by the pipeline. If worker performance is poor, PQ
+ will accumulate jobs until `pipeline_size` will be reached. After that, PQ
+ will be blocked until workers process all the jobs inside.
+
+ Blocked PQ means, that you can push the job into the driver, but RoadRunner
+ will not read that job until PQ will be empty. If RoadRunner will be killed
+ with jobs inside the PQ, they won't be lost, because jobs are deleted from the
+ drivers' queue only after Ack.
+
+- `pool` - All settings in this section are similar to the worker pool settings
+ described on the [configuration page](https://roadrunner.dev/docs/intro-config).
+
+- `consume` - Contains an array of the names of all queues specified in the
+ `"pipelines"` section, which should be processed by the concierge specified in
+ the global `"server"` section (see the [PHP worker's settings](/php/worker.md)).
+
+- `pipelines` - This section contains a list of all queues declared in the
+ RoadRunner. The key is a unique *queue identifier*, and the value is an object
+ from the settings specific to each driver (we will talk about it later).
+
+### Memory Driver
+
+This type of driver is already supported by the RoadRunner and does not require
+any additional installations.
+
+Note that using this type of queue driver, all data is in memory and will be
+destroyed when the RoadRunner Server is restarted. If you need persistent
+queue, then it is recommended to use alternative drivers: `amqp`, `beanstalk`
+or `sqs`.
+
+The complete `memory` driver configuration looks like this:
+
+```yaml
+jobs:
+ pipelines:
+ # User defined name of the queue.
+ example:
+ # Required section.
+ # Should be "memory" for the in-memory driver.
+ driver: memory
+
+ # Optional section.
+ # Default: 10
+ priority: 10
+
+ # Optional section.
+ # Default: 10
+ prefetch: 10
+```
+
+Below is a more detailed description of each of the in-memory-specific options:
+- `priority` - Queue default priority for each task pushed into this queue
+ if the priority value for these tasks was not explicitly set.
+
+- `prefetch` - A local buffer between the PQ (priority queue) and driver. If the
+ PQ size is set to 100 and prefetch to 100000, you'll be able to push up to
+ prefetch number of jobs even if PQ is full.
+
+> Please note that this driver cannot hold more than 1000 tasks with delay at
+> the same time (RR limitation)
+
+### Local (based on the boltdb) Driver
+
+This type of driver is already supported by the RoadRunner and does not require
+any additional installations. It uses boltdb as its main storage for the jobs. This driver should be used locally, for
+testing or developing purposes. It can be used in the production, but this type of driver can't handle
+huge load. Maximum RPS it can have no more than 30-50.
+
+Data in this driver persists in the boltdb database file. You can't open same file simultaneously for the 2 pipelines or
+for the KV plugin and Jobs plugin. This is boltdb limitation on concurrent access from the 2 processes to the same file.
+
+The complete `boltdb` driver configuration looks like this:
+
+```yaml
+
+boltdb:
+ permissions: 0777
+
+jobs:
+ pipelines:
+ # User defined name of the queue.
+ example:
+ # Required section.
+ # Should be "boltdb" for the local driver.
+ driver: boltdb
+
+ # Optional section.
+ # Default: 10
+ priority: 10
+
+ # Optional section.
+ # Default: 1000
+ prefetch: 1000
+```
+
+Below is a more detailed description of each of the in-memory-specific options:
+- `priority` - Queue default priority for each task pushed into this queue
+ if the priority value for these tasks was not explicitly set.
+
+- `prefetch` - A local buffer between the PQ (priority queue) and driver. If the
+ PQ size is set to 100 and prefetch to 100000, you'll be able to push up to
+ prefetch number of jobs even if PQ is full.
+
+- `file` - boltdb database file to use. Might be a path with file: `foo/bar/rr1.db`. Default: `rr.db`.
+
+
+### AMQP Driver
+
+Strictly speaking, AMQP (and 0.9.1 version used) is a protocol, not a full-fledged driver, so you can use
+any servers that support this protocol (on your own, only rabbitmq was tested) , such as:
+[RabbitMQ](https://www.rabbitmq.com/), [Apache Qpid](http://qpid.apache.org/) or
+[Apache ActiveMQ](http://activemq.apache.org/). However, it is recommended to
+use RabbitMQ as the main implementation, and reliable performance with other
+implementations is not guaranteed.
+
+To install and configure the RabbitMQ, use the corresponding
+[documentation page](https://www.rabbitmq.com/download.html). After that, you
+should configure the connection to the server in the "`amqp`" section. This
+configuration section contains exactly one `addr` key with a
+[connection DSN](https://www.rabbitmq.com/uri-spec.html).
+
+```yaml
+amqp:
+ addr: amqp://guest:guest@localhost:5672
+```
+
+After creating a connection to the server, you can create a new queue that will
+use this connection and which will contain the queue settings (including
+amqp-specific):
+
+```yaml
+amqp:
+ addr: amqp://guest:guest@localhost:5672
+
+
+jobs:
+ pipelines:
+ # User defined name of the queue.
+ example:
+ # Required section.
+ # Should be "amqp" for the AMQP driver.
+ driver: amqp
+
+ # Optional section.
+ # Default: 10
+ priority: 10
+
+ # Optional section.
+ # Default: 100
+ prefetch: 100
+
+ # Optional section.
+ # Default: "default"
+ queue: "default"
+
+ # Optional section.
+ # Default: "amqp.default"
+ exchange: "amqp.default"
+
+ # Optional section.
+ # Default: "direct"
+ exchange_type: "direct"
+
+ # Optional section.
+ # Default: "" (empty)
+ routing_key: ""
+
+ # Optional section.
+ # Default: false
+ exclusive: false
+
+ # Optional section.
+ # Default: false
+ multiple_ack: false
+
+ # Optional section.
+ # Default: false
+ requeue_on_fail: false
+```
+
+Below is a more detailed description of each of the amqp-specific options:
+- `priority` - Queue default priority for for each task pushed into this queue
+ if the priority value for these tasks was not explicitly set.
+
+- `prefetch` - The client can request that messages be sent in advance so that
+ when the client finishes processing a message, the following message is
+ already held locally, rather than needing to be sent down the channel.
+ Prefetching gives a performance improvement. This field specifies the prefetch
+ window size in octets. See also ["prefetch-size"](https://www.rabbitmq.com/amqp-0-9-1-reference.html)
+ in AMQP QoS documentation reference.
+
+- `queue` - AMQP internal (inside the driver) queue name.
+
+- `exchange` - The name of AMQP exchange to which tasks are sent. Exchange
+ distributes the tasks to one or more queues. It routes tasks to the queue
+ based on the created bindings between it and the queue. See also
+ ["AMQP model"](https://www.rabbitmq.com/tutorials/amqp-concepts.html#amqp-model)
+ documentation section.
+
+- `exchange_type` - The type of task delivery. May be one of `direct`, `topics`,
+ `headers` or `fanout`.
+ - `direct` - Used when a task needs to be delivered to specific queues. The
+ task is published to an exchanger with a specific routing key and goes to
+ all queues that are associated with this exchanger with a similar routing
+ key.
+ - `topics` - Similarly, `direct` exchange enables selective routing by
+ comparing the routing key. But, in this case, the key is set using a
+ template, like: `user.*.messages`.
+ - `fanout` - All tasks are delivered to all queues even if a routing key is
+ specified in the task.
+ - `headers` - Routes tasks to related queues based on a comparison of the
+ (key, value) pairs of the headers property of the binding and the similar
+ property of the message.
+
+ - `routing_key` - Queue's routing key.
+
+ - `exclusive` - Exclusive queues can't be redeclared. If set to true and
+ you'll try to declare the same pipeline twice, that will lead to an error.
+
+ - `multiple_ack` - This delivery and all prior unacknowledged deliveries on
+ the same channel will be acknowledged. This is useful for batch processing
+ of deliveries. Applicable only for the Ack, not for the Nack.
+
+ - `requeue_on_fail` - Requeue on Nack.
+
+### Beanstalk Driver
+
+Beanstalk is a simple and fast general purpose work queue. To install Beanstalk,
+you can use the [local queue server](https://github.com/beanstalkd/beanstalkd)
+or run the server inside [AWS Elastic](https://aws.amazon.com/elasticbeanstalk/).
+You can choose any option that is convenient for you.
+
+Setting up the server is similar to setting up AMQP and requires specifying the
+connection in the `"beanstalk"` section of your RoadRunner configuration file.
+
+```yaml
+beanstalk:
+ addr: tcp://127.0.0.1:11300
+```
+
+After setting up the connection, you can start using it. Let's take a look at
+the complete config with all the options for this driver:
+
+```yaml
+beanstalk:
+ # Optional section.
+ # Default: tcp://127.0.0.1:11300
+ addr: tcp://127.0.0.1:11300
+
+ # Optional section.
+ # Default: 30s
+ timeout: 10s
+
+jobs:
+ pipelines:
+ # User defined name of the queue.
+ example:
+ # Required section.
+ # Should be "beanstalk" for the Beanstalk driver.
+ driver: beanstalk
+
+ # Optional section.
+ # Default: 10
+ priority: 10
+
+ # Optional section.
+ # Default: 1
+ tube_priority: 1
+
+ # Optional section.
+ # Default: default
+ tube: default
+
+ # Optional section.
+ # Default: 5s
+ reserve_timeout: 5s
+```
+
+These are all settings that are available to you for configuring this type of
+driver. Let's take a look at what they are responsible for:
+- `priority` - Similar to the same option in other drivers. This is queue
+ default priority for for each task pushed into this queue if the priority
+ value for these tasks was not explicitly set.
+
+- `tube_priority` - The value for specifying the priority within Beanstalk is
+ the internal priority of the server. The value should not exceed `int32` size.
+
+- `tube` - The name of the inner "tube" specific to the Beanstalk driver.
+
+### SQS Driver
+
+[Amazon SQS (Simple Queue Service)](https://aws.amazon.com/sqs/) is an
+alternative queue server also developed by Amazon and is also part of the AWS
+service infrastructure. If you prefer to use the "cloud" option, then you can
+use the [ready-made documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configuring.html)
+for its installation.
+
+In addition to the possibility of using this queue server within the AWS, you
+can also use the local installation of this system on your own servers. If you
+prefer this option, then you can use [softwaremill's implementation](https://github.com/softwaremill/elasticmq)
+of the Amazon SQS server.
+
+After you have created the SQS server, you need to specify the following
+connection settings in `sqs` configuration settings. Unlike AMQP and Beanstalk,
+SQS requires more values to set up a connection and will be different from what
+we're used to:
+
+```yaml
+sqs:
+ # Required AccessKey ID.
+ # Default: empty
+ key: access-key
+
+ # Required secret access key.
+ # Default: empty
+ secret: api-secret
+
+ # Required AWS region.
+ # Default: empty
+ region: us-west-1
+
+ # Required AWS session token.
+ # Default: empty
+ session_token: test
+
+ # Required AWS SQS endpoint to connect.
+ # Default: http://127.0.0.1:9324
+ endpoint: http://127.0.0.1:9324
+```
+
+> Please note that although each of the sections contains default values, it is
+> marked as "required". This means that in almost all cases they are required to
+> be specified in order to correctly configure the driver.
+
+After you have configured the connection - you should configure the queue that
+will use this connection:
+
+```yaml
+sqs:
+ # SQS connection configuration...
+
+jobs:
+ pipelines:
+ # Required section.
+ # Should be "sqs" for the Amazon SQS driver.
+ driver: sqs
+
+ # Optional section.
+ # Default: 10
+ prefetch: 10
+
+ # Optional section.
+ # Default: 0
+ visibility_timeout: 0
+
+ # Optional section.
+ # Default: 0
+ wait_time_seconds: 0
+
+ # Optional section.
+ # Default: default
+ queue: default
+
+ # Optional section.
+ # Default: empty
+ attributes:
+ DelaySeconds: 42
+ # etc... see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html
+
+ # Optional section.
+ # Default: empty
+ tags:
+ test: "tag"
+```
+
+Below is a more detailed description of each of the SQS-specific options:
+- `prefetch` - Number of jobs to prefetch from the SQS. Amazon SQS never returns
+ more messages than this value (however, fewer messages might be returned).
+ Valid values: 1 to 10. Any number bigger than 10 will be rounded to 10.
+ Default: `10`.
+
+- `visibility_timeout` - The duration (in seconds) that the received messages
+ are hidden from subsequent retrieve requests after being retrieved by a
+ ReceiveMessage request. Max value is 43200 seconds (12 hours). Default: `0`.
+
+- `wait_time_seconds` - The duration (in seconds) for which the call waits for
+ a message to arrive in the queue before returning. If a message is available,
+ the call returns sooner than WaitTimeSeconds. If no messages are available and
+ the wait time expires, the call returns successfully with an empty list of
+ messages. Default: `5`.
+
+- `queue` - SQS internal queue name. Can contain alphanumeric characters,
+ hyphens (-), and underscores (_). Default value is `"default"` string.
+
+- `attributes` - List of the [AWS SQS attributes](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html).
+> For example
+> ```yaml
+> attributes:
+> DelaySeconds: 0
+> MaximumMessageSize: 262144
+> MessageRetentionPeriod: 345600
+> ReceiveMessageWaitTimeSeconds: 0
+> VisibilityTimeout: 30
+> ```
+
+- `tags` - Tags don't have any semantic meaning. Amazon SQS interprets tags as
+ character.
+> Please note that this functionality is rarely used and slows down the work of
+> queues: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html
+
+## Client (Producer)
+
+Now that we have configured the server, we can start writing our first code for
+sending the task to the queue. But before doing this, we need to connect to our
+server. And to do this, it is enough to create a `Jobs` instance.
+
+```php
+// Server Connection
+$jobs = new Spiral\RoadRunner\Jobs\Jobs();
+```
+
+Please note that in this case we have not specified any connection settings. And
+this is really not required if this code is executed in a RoadRunner environment.
+However, in the case that a connection is required to be established
+from a third-party application (for example, a CLI command), then the settings
+must be specified explicitly.
+
+```php
+$jobs = new Spiral\RoadRunner\Jobs\Jobs(
+ // Expects RPC connection
+ Spiral\Goridge\RPC\RPC::create('tcp://127.0.0.1:6001')
+);
+```
+
+After we have established the connection, we should check the server
+availability and in this case the API availability for the jobs. This can be
+done using the appropriate `isAvailable()` method. When the connection is
+created, and the availability of the functionality is checked, we can connect to
+the queue we need using `connect()` method.
+
+```php
+$jobs = new Spiral\RoadRunner\Jobs\Jobs();
+
+if (!$jobs->isAvailable()) {
+ throw new LogicException('The server does not support "jobs" functionality =(');
+}
+
+$queue = $jobs->connect('queue-name');
+```
+
+### Task Creation
+
+Before submitting a task to the queue, you should create this task. To create a
+task, it is enough to call the corresponding `create()` method.
+
+```php
+$task = $queue->create(SendEmailTask::class);
+// Expected:
+// object(Spiral\RoadRunner\Jobs\Task\PreparedTaskInterface)
+```
+
+> Note that the name of the task does not have to be a class. Here we are using
+> `SendEmailTask` just for convenience.
+
+Also, this method takes an additional second argument with additional data to
+complete this task.
+
+```php
+$task = $queue->create(SendEmailTask::class, ['email' => '[email protected]']);
+```
+
+You can also use this task as a basis for creating several others.
+
+```php
+$task = $queue->create(SendEmailTask::class);
+
+$first = $task->withValue('[email protected]');
+$second = $task->withValue('[email protected]');
+```
+
+### Task Dispatching
+
+And to send tasks to the queue, we can use different methods:
+`dispatch()` and `dispatchMany()`. The difference between these two
+implementations is that the first one sends a task to the queue, returning a
+dispatched task object, while the second one dispatches multiple tasks,
+returning an array. Moreover, the second method provides one-time delivery of
+all tasks in the array, as opposed to sending each task separately.
+
+```php
+$a = $queue->create(SendEmailTask::class, ['email' => '[email protected]']);
+$b = $queue->create(SendEmailTask::class, ['email' => '[email protected]']);
+
+foreach ([$a, $b] as $task) {
+ $result = $queue->dispatch($task);
+ // Expected:
+ // object(Spiral\RoadRunner\Jobs\Task\QueuedTaskInterface)
+}
+
+// Using a batching send
+$result = $queue->dispatchMany($a, $b);
+// Expected:
+// array(2) {
+// object(Spiral\RoadRunner\Jobs\Task\QueuedTaskInterface),
+// object(Spiral\RoadRunner\Jobs\Task\QueuedTaskInterface)
+// }
+```
+
+### Task Immediately Dispatching
+
+In the case that you do not want to create a new task and then immediately
+dispatch it, you can simplify the work by using the `push` method. However, this
+functionality has a number of limitations. In case of creating a new task:
+- You can flexibly configure additional task capabilities using a convenient
+ fluent interface.
+- You can prepare a common task for several others and use it as a basis to
+ create several alternative tasks.
+- You can create several different tasks and collect them into one collection
+ and send them to the queue at once (using the so-called batching).
+
+In the case of immediate dispatch, you will have access to only the basic
+features: The `push()` method accepts one required argument with the
+name of the task and two optional arguments containing additional data for the
+task being performed and additional sending options (for example, a delay).
+Moreover, this method is designed to send only one task.
+
+```php
+use Spiral\RoadRunner\Jobs\Options;
+
+$payload = ['email' => $email, 'message' => $message];
+
+$task = $queue->push(SendEmailTask::class, $payload, new Options(
+ delay: 60 // in seconds
+));
+```
+
+### Task Payload
+
+As you can see, each task, in addition to the name, can contain additional data
+(payload) specific to a certain type of task. You yourself can determine what
+data should be transferred to the task and no special requirements are imposed
+on them, except for the main ones: Since this task is then sent to the queue,
+they must be serializable.
+
+> The default serializer used in jobs allows you to pass anonymous functions
+> as well.
+
+In case to add additional data, you can use the optional second argument
+provided by the `create()` and `push()` methods, or you can use the fluent
+interface to supplement or modify the task data. Everything is quite simple
+here; you can add data using the `withValue()` method, or delete them using the
+`withoutValue()` method.
+
+The first argument of the `withValue()` method passes a payload value as the
+required first argument. If you also need to specify a key for it, just pass it
+as an optional second argument.
+
+```php
+$task = $queue->create(CreateBackup::class)
+ ->withValue('/var/www')
+ ->withValue(42, 'answer')
+ ->withValue('/dev/null', 'output');
+
+// An example like this will be completely equivalent to if we passed
+// all this data at one time
+$task = $queue->create(CreateBackup::class, [
+ '/var/www',
+ 'answer' => 42,
+ 'output' => '/dev/null'
+]);
+
+// On the other hand, we don't need an "answer"...
+$task = $task->withoutValue('answer');
+```
+
+### Task Headers
+
+In addition to the data itself, we can send additional metadata that is not
+related to the payload of the task, that is, headers. In them, we can pass
+any additional information, for example: Encoding of messages, their format,
+the server's IP address, the user's token or session id, etc.
+
+Headers can only contain string values and are not serialized in any way during
+transmission, so be careful when specifying them.
+
+In the case to add a new header to the task, you can use methods
+[similar to PSR-7](https://www.php-fig.org/psr/psr-7/). That is:
+- `withHeader(string, iterable<string>|string): self` - Return an instance with
+ the provided value replacing the specified header.
+- `withAddedHeader(string, iterable<string>|string): self` - Return an instance
+ with the specified header appended with the given value.
+- `withoutHeader(string): self` - Return an instance without the specified header.
+
+```php
+$task = $queue->create(RestartServer::class)
+ ->withValue('addr', '127.0.0.1')
+ ->withAddedHeader('access-token', 'IDDQD');
+
+$queue->dispatch($task);
+```
+
+### Task Delayed Dispatching
+
+If you want to specify that a job should not be immediately available for
+processing by a jobs worker, you can use the delayed job option.
+For example, let's specify that a job shouldn't be available for processing
+until 42 minutes after it has been dispatched:
+
+```php
+$task = $queue->create(SendEmailTask::class)
+ ->withDelay(42 * 60); // 42 min * 60 sec
+```
+
+## Consumer Usage
+
+You probably already noticed that when [setting up a jobs consumer](#configuration),
+the `"server"` configuration section is used in which a PHP file-handler is defined.
+Exactly the same one we used earlier to write a [HTTP Worker](/php/worker.md).
+Does this mean that if we want to use the Jobs Worker, then we can no longer
+use the HTTP Worker? No it is not!
+
+During the launch of the RoadRunner, it spawns several workers defined in the
+`"server"` config section (by default, the number of workers is equal to the
+number of CPU cores). At the same time, during the spawn of the workers, it
+transmits in advance to each of them information about the *mode* in which this
+worker will be used. The information about the *mode* itself is contained in the
+environment variable `RR_ENV` and for the HTTP worker the value will correspond
+to the `"http"`, and for the Jobs worker the value of `"jobs"` will be stored
+there.
+
+![queue-mode](https://user-images.githubusercontent.com/2461257/128106755-cb0d3cb7-3f98-433e-a1c7-1ed92839376a.png)
+
+There are several ways to check the operating mode from the code:
+- By getting the value of the env variable.
+- Or using the appropriate API method (from the `spiral/roadrunner-worker` package).
+
+The second choice may be more preferable in cases where you need to change the
+RoadRunner's mode, for example, in tests.
+
+```php
+use Spiral\RoadRunner\Environment;
+use Spiral\RoadRunner\Environment\Mode;
+
+// 1. Using global env variable
+$isJobsMode = $_SERVER['RR_MODE'] === 'jobs';
+
+// 2. Using RoadRunner's API
+$env = Environment::fromGlobals();
+
+$isJobsMode = $env->getMode() === Mode::MODE_JOBS;
+```
+
+After we are convinced of the specialization of the worker, we can write the
+corresponding code for processing tasks. To get information about the available
+task in the worker, use the
+`$consumer->waitTask(): ReceivedTaskInterface` method.
+
+```php
+use Spiral\RoadRunner\Jobs\Consumer;
+use Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface;
+
+
+$consumer = new Consumer();
+
+/** @var Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface $task */
+while ($task = $consumer->waitTask()) {
+ var_dump($task);
+}
+```
+
+After you receive the task from the queue, you can start processing it in
+accordance with the requirements. Don't worry about how much memory or time this
+execution takes - the RoadRunner takes over the tasks of managing and
+distributing tasks among the workers.
+
+After you have processed the incoming task, you can execute the
+`complete(): void` method. After that, you tell the RoadRunner that you are
+ready to handle the next task.
+
+```php
+$consumer = new Spiral\RoadRunner\Jobs\Consumer();
+
+while ($task = $consumer->waitTask()) {
+
+ //
+ // Task handler code
+ //
+
+ $task->complete();
+}
+```
+
+We got acquainted with the possibilities of receiving and processing tasks, but
+we do not yet know what the received task is. Let's see what data it contains.
+
+### Task Failing
+
+In some cases, an error may occur during task processing. In this case, you
+should use the `fail()` method, informing the RoadRunner about it. The method
+takes two arguments. The first argument is required and expects any string or
+string-like (instance of Stringable, for example any exception) value with an
+error message. The second is optional and tells the server to restart this task.
+
+```php
+$consumer = new Spiral\RoadRunner\Jobs\Consumer();
+$shouldBeRestarted = false;
+
+while ($task = $consumer->waitTask()) {
+ try {
+ //
+ // Do something...
+ //
+ $task->complete();
+ } catch (\Throwable $e) {
+ $task->fail($e, $shouldBeRestarted);
+ }
+}
+```
+
+In the case that the next time you restart the task, you should update the
+headers, you can use the appropriate method by adding or changing the headers
+of the received task.
+
+```php
+$task
+ ->withHeader('attempts', (int)$task->getHeaderLine('attempts') - 1)
+ ->withHeader('retry-delay', (int)$task->getHeaderLine('retry-delay') * 2)
+ ->fail('Something went wrong', requeue: true)
+;
+```
+
+In addition, you can re-specify the task execution delay. For example, in the
+code above, you may have noticed the use of a custom header `"retry-delay"`, the
+value of which doubled after each restart, so this value can be used to specify
+the delay in the next task execution.
+
+```php
+$task
+ ->withDelay((int)$task->getHeaderLine('retry-delay'))
+ ->fail('Something went wrong', true)
+;
+```
+
+### Received Task ID
+
+Each task in the queue has a **unique** identifier. This allows you to
+unambiguously identify the task among all existing tasks in all queues, no
+matter what name it was received from.
+
+In addition, it is worth paying attention to the fact that the identifier is not
+a sequential number that increases indefinitely. It means that there is still a
+chance of an identifier collision, but it is about 1/2.71 quintillion. Even if
+you send 1 billion tasks per second, it will take you about 85 years for an ID
+collision to occur.
+
+```php
+echo $task->getId();
+// Expected Result
+// string(36) "88ca6810-eab9-473d-a8fd-4b4ae457b7dc"
+```
+
+In the case that you want to store this identifier in the database, it is
+recommended to use a binary representation (16 bytes long if your DB requires
+blob sizes).
+
+```php
+$binary = hex2bin(str_replace('-', '', $task->getId()));
+// Expected Result
+// string(16) b"ˆÊh\x10ê¹G=¨ýKJäW·Ü"
+```
+
+### Received Task Queue
+
+Since a worker can process several different queues at once, you may need to
+somehow determine from which queue the task came. To get the name of the queue,
+use the `getQueue(): string` method.
+
+```php
+echo $task->getQueue();
+// Expected
+// string(13) "example-queue"
+```
+
+For example, you can select different task handlers based on different types of
+queues.
+
+```php
+// This is just an example of a handler
+$handler = $container->get(match($task->getQueue()) {
+ 'emails' => 'email-handler',
+ 'billing' => 'billing-handler',
+ default => throw new InvalidArgumentException('Unprocessable queue [' . $task->getQueue() . ']')
+});
+
+$handler->process($task);
+```
+
+### Received Task Name
+
+The task name is some identifier associated with a specific type of task. For
+example, it may contain the name of the task class so that in the future we can
+create an object of this task by passing the required data there. To get the
+name of the task, use the `getName(): string` method.
+
+```php
+echo $task->getName();
+// Expected
+// string(21) "App\\Queue\\Task\\EmailTask"
+```
+
+Thus, we can implement the creation of a specific task with certain data for
+this task.
+
+```php
+$class = $task->getName();
+
+if (!class_exists($class)) {
+ throw new InvalidArgumentException("Unprocessable task [$class]");
+}
+
+$handler->process($class::fromTask($task));
+```
+
+### Received Task Payload
+
+Each task contains a set of arbitrary user data to be processed within the task.
+To obtain this data, you can use one of the available methods:
+
+**getValue**
+
+Method `getValue()` returns a specific payload value by key or `null` if no
+value was passed. If you want to specify any other default value (for those
+cases when the payload with the identifier was not passed), then use the second
+argument, passing your own default value there.
+
+```php
+if ($task->getName() !== SendEmailTask::class) {
+ throw new InvalidArgumentException('Does not look like a mail task');
+}
+
+echo $task->getValue('email'); // "[email protected]"
+echo $task->getValue('username', 'Guest'); // "John"
+```
+
+**hasValue**
+
+To check the existence of any value in the payload, use the `hasValue()` method.
+This method will return `true` if the value for the payload was passed and `false`
+otherwise.
+
+```php
+if (!$task->hasValue('email')) {
+ throw new InvalidArgumentException('The "email" value is required for this task');
+}
+
+$email->sendTo($task->getValue('email'));
+```
+
+**getPayload**
+
+Also you can get all data at once in `array(string|int $key => mixed $value)`
+format using the `getPayload` method. This method may be useful to you in cases
+of transferring all data to the DTO.
+
+```php
+$class = $task->getName();
+$arguments = $task->getPayload();
+
+$dto = new $class(...$arguments);
+```
+
+You should pay attention that an array can contain both `int` and `string`
+keys, so you should take care of their correct pass to the constructor
+yourself. For example, the code above will work completely correctly only in the
+case of PHP >= 8.1. And in the case of earlier versions of the language, you
+should use the [reflection functionality](https://www.php.net/manual/ru/reflectionclass.newinstanceargs.php),
+or pass the payload in some other way.
+
+Since the handler process is not the one that put this task in the queue, then
+if you send any object to the queue, it will be serialized and then automatically
+unpacked in the handler. The default serializer suitable for most cases, so you
+can even pass `Closure` instances. However, in the case of any specific data
+types, you should manage their packing and unpacking yourself, either by
+replacing the serializer completely, or for a separate value. In this case, do
+not forget to specify this both on the client and consumer side.
+
+### Received Task Headers
+
+In the case that you need to get any additional information that is not related
+to the task, then for this you should use the functionality of headers.
+
+For example, headers can convey information about the serializer, encoding, or
+other metadata.
+
+```php
+$message = $task->getValue('message');
+$encoding = $task->getHeaderLine('encoding');
+
+if (strtolower($encoding) !== 'utf-8') {
+ $message = iconv($encoding, 'utf-8', $message);
+}
+```
+
+The interface for receiving headers is completely similar to
+[PSR-7](https://www.php-fig.org/psr/psr-7/), so methods are available to you:
+- `getHeaders(): array<string, array<string, string>>` - Retrieves all task
+ header values.
+- `hasHeader(string): bool` - Checks if a header exists by the given name.
+- `getHeader(string): array<string, string>` - Retrieves a message header value
+ by the given name.
+- `getHeaderLine(string): string` - Retrieves a comma-separated string of the
+ values for a single header by the given name.
+
+We got acquainted with the data and capabilities that we have in the consumer.
+Let's now get down to the basics - sending these messages.
+
+## Advanced Functionality
+
+In addition to the main functionality of queues for sending and processing in
+API has additional functionality that is not directly related to these tasks.
+After we have examined the main functionality, it's time to disassemble the
+advanced features.
+
+### Creating A New Queue
+
+In the very [first chapter](/beep-beep/jobs.md#configuration), we got acquainted
+with the queue settings and drivers for them. In approximately the same way, we
+can do almost the same thing with the help of the PHP code using `create()`
+method through `Jobs` instance.
+
+To create a new queue, the following types of DTO are available to you:
+
+- `Spiral\RoadRunner\Jobs\Queue\AMQPCreateInfo` for AMQP queues.
+- `Spiral\RoadRunner\Jobs\Queue\BeanstalkCreateInfo` for Beanstalk queues.
+- `Spiral\RoadRunner\Jobs\Queue\MemoryCreateInfo` for in-memory queues.
+- `Spiral\RoadRunner\Jobs\Queue\SQSCreateInfo` for SQS queues.
+
+Such a DTO with the appropriate settings should be passed to the `create()`
+method to create the corresponding queue:
+
+```php
+use Spiral\RoadRunner\Jobs\Jobs;
+use Spiral\RoadRunner\Jobs\Queue\MemoryCreateInfo;
+
+$jobs = new Jobs();
+
+//
+// Create a new "example" in-memory queue
+//
+$queue = $jobs->create(new MemoryCreateInfo(
+ name: 'example',
+ priority: 42,
+ prefetch: 10,
+));
+```
+
+### Getting A List Of Queues
+
+In that case, to get a list of all available queues, you just need to use the
+standard functionality of the `foreach` operator. Each element of this collection
+will correspond to a specific queue registered in the RoadRunner. And to simply
+get the number of all available queues, you can pass a `Job` object to the
+`count()` function.
+
+```php
+$jobs = new Spiral\RoadRunner\Jobs\Jobs();
+
+foreach ($jobs as $queue) {
+ var_dump($queue->getName());
+ // Expects name of the queue
+}
+
+$count = count($jobs);
+// Expects the number of a queues
+```
+
+### Pausing A Queue
+
+In addition to the ability to create new queues, there may be times when a queue
+needs to be suspended for processing. Such cases can arise, for example, in the
+case of deploying a new application, when the processing of tasks should be
+suspended during the deployment of new application code.
+
+In this case, the code will be pretty simple. It is enough to call the `pause()`
+method, passing the names of the queues there. In order to start the work of
+queues further (unpause), you need to call a similar `resume()` method.
+
+```php
+$jobs = new Spiral\RoadRunner\Jobs\Jobs();
+
+// Pause "emails", "billing" and "backups" queues.
+$jobs->pause('emails', 'billing', 'backups');
+
+// Resuming only "emails" and "billing".
+$jobs->resume('emails', 'billing');
+```
+
+## RPC Interface
+
+All communication between PHP and GO made by the RPC calls with protobuf payloads.
+You can find versioned proto-payloads here: [Proto](https://github.com/spiral/roadrunner/blob/e9713a1d08a93e2be70c889c600ed89f54822b54/proto/jobs/v1beta).
+
+- `Push(in *jobsv1beta.PushRequest, out *jobsv1beta.Empty) error` - The
+ arguments: the first argument is a `PushRequest`, which contains one field
+ of the `Job` being sent to the queue; the second argument is `Empty`, which
+ means that the function does not return a result (returns nothing). The error
+ returned if the request fails.
+
+- `PushBatch(in *jobsv1beta.PushBatchRequest, out *jobsv1beta.Empty) error` -
+ The arguments: the first argument is a `PushBatchRequest`, which contains one
+ repeated (list) field of the `Job` being sent to the queue; the second
+ argument is `Empty`, which means that the function does not return a result.
+ The error returned if the request fails.
+
+- `Pause(in *jobsv1beta.Pipelines, out *jobsv1beta.Empty) error` - The arguments:
+ the first argument is a `Pipelines`, which contains one repeated (list)
+ field with the `string` names of the queues to be paused; the second
+ argument is `Empty`, which means that the function does not return a result.
+ The error returned if the request fails.
+
+- `Resume(in *jobsv1beta.Pipelines, out *jobsv1beta.Empty) error` - The
+ arguments: the first argument is a `Pipelines`, which contains one
+ repeated (list) field with the `string` names of the queues to be resumed; the
+ second argument is `Empty`, which means that the function does not return a
+ result. The error returned if the request fails.
+
+- `List(in *jobsv1beta.Empty, out *jobsv1beta.Pipelines) error` - The
+ arguments: the first argument is an `Empty`, meaning that the function does
+ not accept anything (from the point of view of the PHP API, an empty string
+ should be passed); the second argument is `Pipelines`, which contains one
+ repeated (list) field with the `string` names of the all available queues.
+ The error returned if the request fails.
+
+- `Declare(in *jobsv1beta.DeclareRequest, out *jobsv1beta.Empty) error` - The
+ arguments: the first argument is an `DeclareRequest`, which contains one
+ `map<string, string>` pipeline field of queue configuration; the second
+ argument is `Empty`, which means that the function does not return a result.
+ The error returned if the request fails.
+
+- `Stat(in *jobsv1beta.Empty, out *jobsv1beta.Stats) error` - The arguments:
+ the first argument is an `Empty`, meaning that the function does not accept
+ anything (from the point of view of the PHP API, an empty string should be
+ passed); the second argument is `Stats`, which contains one repeated (list)
+ field named `Stats` of type `Stat`. The error returned if the request fails.
+
+
+From the PHP point of view, such requests (`List` for example) are as follows:
+```php
+use Spiral\Goridge\RPC\RPC;
+use Spiral\Goridge\RPC\Codec\ProtobufCodec;
+use Spiral\RoadRunner\Jobs\DTO\V1\Maintenance;
+
+$response = RPC::create('tcp://127.0.0.1:6001')
+ ->withServicePrefix('jobs')
+ ->withCodec(new ProtobufCodec())
+ ->call('List', '', Maintenance::class);
+```