summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/config.go62
-rw-r--r--plugins/jobs/doc/jobs_arch.drawio1
-rw-r--r--plugins/jobs/drivers/amqp/config.go63
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go472
-rw-r--r--plugins/jobs/drivers/amqp/item.go228
-rw-r--r--plugins/jobs/drivers/amqp/listener.go25
-rw-r--r--plugins/jobs/drivers/amqp/plugin.go40
-rw-r--r--plugins/jobs/drivers/amqp/rabbit_init.go65
-rw-r--r--plugins/jobs/drivers/amqp/redial.go141
-rw-r--r--plugins/jobs/drivers/beanstalk/config.go53
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go206
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go317
-rw-r--r--plugins/jobs/drivers/beanstalk/encode_test.go75
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go147
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go39
-rw-r--r--plugins/jobs/drivers/beanstalk/plugin.go47
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go244
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go115
-rw-r--r--plugins/jobs/drivers/ephemeral/plugin.go41
-rw-r--r--plugins/jobs/drivers/sqs/config.go114
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go376
-rw-r--r--plugins/jobs/drivers/sqs/item.go247
-rw-r--r--plugins/jobs/drivers/sqs/listener.go87
-rw-r--r--plugins/jobs/drivers/sqs/plugin.go39
-rw-r--r--plugins/jobs/job/general.go29
-rw-r--r--plugins/jobs/job/job_options.go32
-rw-r--r--plugins/jobs/job/job_options_test.go45
-rw-r--r--plugins/jobs/pipeline/pipeline.go90
-rw-r--r--plugins/jobs/pipeline/pipeline_test.go21
-rw-r--r--plugins/jobs/plugin.go573
-rw-r--r--plugins/jobs/protocol.go78
-rw-r--r--plugins/jobs/response_protocol.md54
-rw-r--r--plugins/jobs/rpc.go136
33 files changed, 4302 insertions, 0 deletions
diff --git a/plugins/jobs/config.go b/plugins/jobs/config.go
new file mode 100644
index 00000000..454256b9
--- /dev/null
+++ b/plugins/jobs/config.go
@@ -0,0 +1,62 @@
+package jobs
+
+import (
+ "runtime"
+
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+)
+
+const (
+ // name used to set pipeline name
+ pipelineName string = "name"
+)
+
+// Config defines settings for job broker, workers and job-pipeline mapping.
+type Config struct {
+ // NumPollers configures number of priority queue pollers
+ // Should be no more than 255
+ // Default - num logical cores
+ NumPollers uint8 `mapstructure:"num_pollers"`
+
+ // PipelineSize is the limit of a main jobs queue which consume Items from the drivers pipeline
+ // Driver pipeline might be much larger than a main jobs queue
+ PipelineSize uint64 `mapstructure:"pipeline_size"`
+
+ // Timeout in seconds is the per-push limit to put the job into queue
+ Timeout int `mapstructure:"timeout"`
+
+ // Pool configures roadrunner workers pool.
+ Pool *poolImpl.Config `mapstructure:"Pool"`
+
+ // Pipelines defines mapping between PHP job pipeline and associated job broker.
+ Pipelines map[string]*pipeline.Pipeline `mapstructure:"pipelines"`
+
+ // Consuming specifies names of pipelines to be consumed on service start.
+ Consume []string `mapstructure:"consume"`
+}
+
+func (c *Config) InitDefaults() {
+ if c.Pool == nil {
+ c.Pool = &poolImpl.Config{}
+ }
+
+ if c.PipelineSize == 0 {
+ c.PipelineSize = 1_000_000
+ }
+
+ if c.NumPollers == 0 {
+ c.NumPollers = uint8(runtime.NumCPU())
+ }
+
+ for k := range c.Pipelines {
+ // set the pipeline name
+ c.Pipelines[k].With(pipelineName, k)
+ }
+
+ if c.Timeout == 0 {
+ c.Timeout = 60
+ }
+
+ c.Pool.InitDefaults()
+}
diff --git a/plugins/jobs/doc/jobs_arch.drawio b/plugins/jobs/doc/jobs_arch.drawio
new file mode 100644
index 00000000..aaed82c7
--- /dev/null
+++ b/plugins/jobs/doc/jobs_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-07-09T07:14:41.096Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/91.0.4472.124 Electron/13.1.6 Safari/537.36" etag="0gh7yhPcQUpxg5xU25Ad" version="14.6.13" type="device"><diagram id="AFQlLRRq6yGg9IpTzkrs" name="Page-1">7R1pc9q69tcwk96ZZGzL68csNLdt2tCQtM398sZgEdwYTG2ThP76J8mysaUDmGADWbhzG5A3+eybjlrodPR0HrmT4dfQw0FLU7ynFjpraZqKDI38oSOzdMSyzXTgLvI9ftJ8oOv/xXxQ4aNT38Nx6cQkDIPEn5QH++F4jPtJacyNovCxfNogDMpPnbh3WBro9t1AHv3pe8kwmx35zI/8i/27YSIdGrnZ+XwgHrpe+FgYQu0WOo3CMEm/jZ5OcUDhl4Emve7jgqP53CI8Tqpc0O3E5u3p15n7OdSH6t23b+376FDnk3twgyl/6avLy2sy8vnypEv+dC5uzj9942+QzDLIROF07GF6Z6WFTh6HfoK7E7dPjz4SWiBjw2QUkF8q+ToIx8lHd+QHlAy6H8mdvobjkBxwA/9uTMaiFIIngdvDQSeM/cQP6XiAB3T4AUeJT7ByIRxOwknh6DG/WS9MknBEH+sHwWkYhBGbMhoMsNnvk/E4icJ7XDjiWU5Poe8hg5RDmT4DPxWGOIjPcTjCSTQjp/CjumJpR0Z6Fad61Tb5wOOciDRTT8eGRfLRHU68nHDv8vvPEUu+cNzCeP41Hj/9q5LrHv4mY+v8O/5P6R9qEgaxR0id/wyjZBjehWM3aM9HT8o4np9zEVK4M8z+xkky43zrTpOwjHcCxWj2i15PIMJ/3haPnT3xm6e/ZvzXQnpZiJ84nEZ9vOz9OQASN7rDyZIT+Q0pdJaiO8KBm/gPZUEBoY1f2gl9MuecTBxbK9GIpdgZ1WQ3SafKryvytXgryyrfSpNulYJHutVxFLmzwmkTekIs0Vr+1s8nP7QT8nvykwL1kV+3GbmR73Paoz9mRULcD5JVK1KsviWKFchMd6pSbF1ElMnigqrqkDsph1RNTeMhuSn9c+Im/eGGCqusOTwD254OaQ5b6yHT5PgvjCvs8yy6qK5qbAEjhpFjpKhoMoOkqGh0uyE9o75z+vqcrlfVTea2WN08sh3TMFRL01VkOSUyQ0aDjH/5t4O+DNH/HpxDEyXnXYWYnxnj74imihSV0xdMU6LFafcxbHH2bEM35hangNBmqSg1heonGkUwQ1StIpksNkPgJ1mmoIksnT9pkYkkXjGf25xS01nUqrB0SWGdYzqriT/BgT/GtSoprBI1ZUHE5pgWcnelpBxVQBaopFQNUFIINaSkDAkvLc0MqKc5KeHD/DOlzvnJiNCtT1zLY3JUmTyRfxlslHT8kLmh9JheOEbAkxxy/5Ye6xMQ4qh82MP9MHJTZ5aeQ5EfpYSRPZp8u+N/2QR72cDnsEfActxNomk/mUY4O4EApCdeRMYm4tgwEkee9e6ph04P2vSgOG0yS3bFGbt35I/v8hfJn99xZ0HoegvOg+bexEw/eZhJjOVzvZxQbMX5eflvaLICc4txijR6kYVAeKgjJGcNAhYmovzNmZMrKVVbxqyLRcPAoP9BosFkH1k0pJ96JEDO27PMIEUy/5sA/6ua3pAAMF+BAMhIb+9lQCfXdjtl8E7kh5GfzPJpTImJQINwIo+fETtotmCy87OOkwSPJslcFDCDY7uvdEWZoTzdwix6Ena3Na9rf4TDafKcSb3LTFBmIlupKDNRUzIzs+PePfs1fLLMY1/t2tv74Npr5tZjerIqTn2kPtFt0xFTmAdeRF44+sDe/s6PiRrF1FwchPRoMsQll0rpzcq/x+7oTXhatlNOKSAN9LQsQGqYjQkN511orC807IpCI1MSuxUaapPxwKUAKgiNr5g8kQz9Zg5nmLtpjz4hfEFuFGQDP/GIoo3fIRwHFGTUuJvJN6xRjAwGA21BPtrsmcbOxIhRDthoKphV0CExojQkRjS5SoFmeg4oJlvaaYomqh+IJCCPJ/+fUJBGteLLc7E9APFl9m3cG+yJ2FcVrarYN8ym8CWn6hYghLrBZahnxj33l+X6jpHveamewLH/1+2xW1Eo87gxua9x0jLO6L2Iaoi5HyAhbhyOccO4cRyxEkQBcGMYgB1vNYUaTUKNhBTiu1H5eJpXVSkybZP/PtLHntxFrufj+bEMrGXmyU8HceC58TDnykXVPnldz3KnMKcboQBJ9AczG4Q+0o0n6YsO/Cc6j5SUcNR+wClFMeoZuhN6wejpjla7HbmPsX40jdmzGiUiZJeIyLQhBrdsmYisGrK8f82B94jsj/r96Pj2fPrUsZx/s3KOncTPgMCYO6IEEMi/GBIHVL5nw+VLypE1KTSRB9FOC85IjwmyDzuKtYGgkqIyy+DIzpKjY1SdprbOPzGLJ8ZHn1OdiqOIOlo1h5CaepErbukdFKy8f7LvR1ks8uW9FyfB1/ZaFwRZB3Taqc6WX+nFvMkWwq6NMb9L1FiRstKg94cX8wJXWOSNpW8gmDuZYo+YDVC0G+TIMuQyLFT+ex1SNiw5pIwgL0G1a8jDg2bEe3BoaXAIhJkhx4bA82qPJ2+EabnionPT/XdD97xBm1s1BW5xdCgDA5VWohqCIMswWoDhn6PftAqE2MZDl7hqwT7DUxXgiRwAnmAVUA3wjK/Oesrv+98/rfvvXffX6U/z+iYrAnhB0kctyp6CKGpE+lw5Nxex8dDt2sF/t7+02P3pX0A16SBsm6lJX7dQUHWssuusGrZVpBv5CjMrTnn+FaZmCbS5WWnhMkQUhMG34+vuhgLgWWku2PJZEAitQ5Q4mgjwrPK0KEpUQJToTYkSe5eiRC0KkhUFynNDZpuiBITZzkTJRph+eSbrVpXGRphG+6E0NFG+IHt5dTmRQBtfYaj1Kg0YEcr+UK/6gql3mUbeO+q1neUGjEyL619Rt8kDI2K3y4COrNdCvuB52p6QryESo76CGJXNr7DQNshXstgLFcnfb9o37Vqtd9NVlMEAst6RpSinp7KRPmCfpgMBsuyArHcoENCY9b5Hha0VxcpWw5DLxMULs941iQe/tr9eXt3Wynkvxm+21F37zcb+cN5yb6oBDkK74qCaQmm6uiIwpouSXt+GjyOXphx//d55mwyuO7tmcHkFdAPdpYQyTVtRLARh59hQFB0oZduO4YPsKoYPtKanMeyosj78NJoQkUqx0R9iYN3E667XVFVUTlOpKGtoUkr7WTKS6ijYhJEkSzQCkYDWKpC3jcIH38ORXJn+yhFlKkJlrWEC+URkNoMosJWe7OCpdF3BHFkuuweXfAXMLaob8vwHcSieuONsTGN3T2/6aewnPkHw33QFA7UXCDApwHzG0Om6B/5IOpUxHYgx+8OLXVu8fD5dUNX5XqidKT62MAxMcMWcEZ1zhy0N9fMpe2VwFGcxcietfBHHPZ7FKeLYYZ/OmC7uitnd8mv64Xjg35EvB+7ozyRdHtDD7jhO3OA+/YmT/occCgxh9B6HDFt5pWexeHQdQFRmRKG74bbZUNCCH9mnJvaUopCO4kjs6TiAHNVrWIoGsqes68qmSLd99aP9VhWfZuqi4kOAPLXQNuXpjuO9m8dl5pGYJuIyINAArxI8jyNt+3GZZbMu8Cbnxo38glVdZ3PGfW7brs1YLkuzzN01oE0sXBLVFMvtUUnUi+E4oO3aMhrfE46THfN9yEZs206ReRBlfXm3kY1YJpmLmLkE4lfDcNSbxmsjgkY6+gqEiMFg1WJVSW7WgQGxP5xqGLIUtLeJALnOlrz9+nKxusGfL2wk7tgx7VpP7znB43SEy037eTh7bib6+fLQqlkeckLJ3OsNWxkgVTHE9uyGJayDrqlPJVLmy33nz1peGQRdo9u6QNSbxc1BdDgS1c+wHFWqk+zTZbtN0vyaxkHzNJ/F3rdM9JpjmCJRmZbgaNfXswN+9Xfbcn26yuhlNWEZe2VdZvMuSJMfxxcb25SVDZMlfdoXdHZvxJg07YqlLXU0bYMRIUe9OiENqRJZyxtPUppIphHrljMK4zSrkLsC6YVvLRimSVu4GJocDkMqtIVLY+EwOUjy0/WTN4ojpDhqFRzZW0WR7FX/SHGj+GmSwsPExvFY2oXmIeTcjz+g82dt6zBLjByffslG+jT3oLgeHWaIX5E6AtZnL162XsxsFBIw+fxfeTKkDrHhSPJflZMemgo0amks6ZFtwPZuda1jdQFrrOEgWu37cWwmfuRV1j8vr760r1ilzeXlxbbMLyDmZIO+HDKRg7z6zC9RI1iOWtEAM5rCiRzOa/9qn95c01TjwZ8jL2TdT4rp62kUtVjePvjwxpS65piSn2plWxUVZWhTHdNgFMoBwTPWkJaq9C6OHli/kbeYNUa2JWaNjZwHiyxnADqvOXzt0RrNfaznh6sjqkYaUDP1yFLMStcUXTLwUbVAbV0xK6DtaMr5rbxcZDcKdSsJal3VJQzomcmzsxR1TtEvlrt3YNFqWkXurr+j9GbIBsJXUpqa93Ic9+IJ5HJulLZWXJqfBBhNO7PM5VmIRUXmDXEmMoFC5a0mrjU5hfSGA1R6vu93rr2AfY+2Gp5CcgSx48YxCzOxTRNo4+y8GPiNYUtc3oTyTQXLlqy6TYTtUc6sYreR3es6VLkga79yZkgOHtPmqJhFBrK29ulOKIe0XbU/8Ptk5M8UA7K1ITvU82mPzLQd9yOOE1ljVmiS35ACNHSgYBlSgHXsUgCHA99usHX9DbM3YPCq4Vnk7BeDA00wrz5dXn26vp0bs0qDfTFXuZUVrd0tGrWWUrEiWq9hJwsYaXL89gr3MSEWj4tkbtymq42OzvCDn668gvN7bMuqz5e0tfHBCI/CaPYhPZUdcMcUIP1wTMHGDKaGk3yE19gt8g0k09f58J7lW2wpquI63szxKUaooUbKjWX5kByhpv38W1lKISNRvo6uUN5RyPROVtV/v3oPQPDXrOwmxZIPe6v2v+xRs1pwZRCFoxxxdKVmo/qCM62gLXqq5w3ASnJVsZCDm9MKjmocaVUVg7Q3WX3m3uJdUChIM+n4OW3R/OizO/YYxi47nfZZq5T244sRJ8GUNtCfy9nird4aSyJDFzCPZJbUsv2vtsKSuuylVd8RId3YQJn/u4u9bD4lmMqOg4K8Zz5kYzvawFs4pEkKZSl40hFpp/YzljJ/7i7SdU8nC0izSZX3lN7dpE5Cb1bYVqU3S56xkUrdkzolsoLcks3roDgxtiSfblvzjB1H6p7kcf/+YINtdOqezjd39XwEpdDYdiYr05I73d8EmYL9Bu6DqCmgtmhMXcjbG3NEihbbq13PZ1ugtbbVJX267MW/r+lrVQ+56cA25pd/O+jLEP3vwTk0UXLeVS5uziu3Fah7UZ/Ytsl2BKu/tiV9opeYPWnxgj7hCiKBHIGgG1jOp8shgff1fK+F3jXHsAWaEpMaNdE7eZIgzrOBhd2V5bnxAspG6T3Lyxb7PsbMchLLQ0QHKV1VwfzycXjIFkcNWDA2DmlBJ0+5vbUGa5olilQHKsTVgEJc1FQCxZB978w8fkdqJRNZsUWkIiB2vV2kAjFOITJBHxvcyfba7pFVB6PZitgAY14Ev6pvax1hrqf48Xr64+EPunBubi87ivX552jJBs+Lkk7rbZJaTCqxLonZtr1F3lQm/sRnG3rGrfXyUU1NNW25mLYITOe37nq4pmbGGit2Eybp+vkED7yQSUCqnz0c4GTtzF5T89VZI0gGv8J85zHJvZikkU2SbypbCJkeeHjgTgMKWMqYNILFtoKncB6Qs719AbR5tMgEqju3+4KELlJtsXzWVqFokQEIXaeGKAXoTGiS0G2nsSKFrQqmEQslnvaHXPKkRLmn6JFwAWBsIXoss6wSdR3QhzqAGq0p1EA9mPNGsbRvLFvf7TLXLlVkrD5ENkwLW1TzbPy8rG8NZG6njqIGVKqKVcYlMqCwLNj3vI6dX0FsbpLDq7qruSixOfuVDnu4H/IuHOwcmrSP+Kb3i3aXz9NOOSE1k7Vb8c7z3dptKG1B+zYbJzxFZ5zNc5fWScs6q5TGgMNLAgPIGQ3qP/Fwk6ot86eWxNQ3zWNsxjHiuoEFeQygFa+h1lHsALJM5mns/xani3bYmEcRW8+ImgsRvpVhRRCIQOEqDGwbppBNw4ZiKBqZxpGCdN3WLMM2NUXwJutbXbkMGgUpzDtxU9XKKhJDIEKzVmnTggzXZvzpKAIUUcVapDoKz0FIyknG0rJ093VaiaqJxNWpkKR0tmknynnGM9i0W5dwn1tgvxGEpY5mxNnVq9F6Y7Zb1uRk/xXRIoWjVdkFdPEeUbVoIruiJjI2VEQboVpOYbYntMtG5Abr8tMikDaiIExBQWiwywOzTXMWnP7SGWcP+MZ5CXwjZ1TALfD2mWUQcWWqlnhrVnM8s9MFuHXwjLUHTAN1sd0/rgF61p60j791r48vvrwo3tGBBvow4zTGNrLm5vuU0cUQuXu3z1Clbl5ZIunKkalVA6xuH1nrw5b8jEIaRZs72QQEw6+hh+kZ/wc=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go
new file mode 100644
index 00000000..73482d4d
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/config.go
@@ -0,0 +1,63 @@
+package amqp
+
+// pipeline rabbitmq info
+const (
+ exchangeKey string = "exchange"
+ exchangeType string = "exchange-type"
+ queue string = "queue"
+ routingKey string = "routing-key"
+ prefetch string = "prefetch"
+ exclusive string = "exclusive"
+ priority string = "priority"
+ multipleAsk string = "multiple_ask"
+ requeueOnFail string = "requeue_on_fail"
+
+ dlx string = "x-dead-letter-exchange"
+ dlxRoutingKey string = "x-dead-letter-routing-key"
+ dlxTTL string = "x-message-ttl"
+ dlxExpires string = "x-expires"
+
+ contentType string = "application/octet-stream"
+)
+
+type GlobalCfg struct {
+ Addr string `mapstructure:"addr"`
+}
+
+// Config is used to parse pipeline configuration
+type Config struct {
+ Prefetch int `mapstructure:"prefetch"`
+ Queue string `mapstructure:"queue"`
+ Priority int64 `mapstructure:"priority"`
+ Exchange string `mapstructure:"exchange"`
+ ExchangeType string `mapstructure:"exchange_type"`
+ RoutingKey string `mapstructure:"routing_key"`
+ Exclusive bool `mapstructure:"exclusive"`
+ MultipleAck bool `mapstructure:"multiple_ask"`
+ RequeueOnFail bool `mapstructure:"requeue_on_fail"`
+}
+
+func (c *Config) InitDefault() {
+ // all options should be in sync with the pipeline defaults in the FromPipeline method
+ if c.ExchangeType == "" {
+ c.ExchangeType = "direct"
+ }
+
+ if c.Exchange == "" {
+ c.Exchange = "amqp.default"
+ }
+
+ if c.Prefetch == 0 {
+ c.Prefetch = 10
+ }
+
+ if c.Priority == 0 {
+ c.Priority = 10
+ }
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Addr == "" {
+ c.Addr = "amqp://guest:[email protected]:5672/"
+ }
+}
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
new file mode 100644
index 00000000..429953e1
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -0,0 +1,472 @@
+package amqp
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/google/uuid"
+ amqp "github.com/rabbitmq/amqp091-go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type JobConsumer struct {
+ sync.Mutex
+ log logger.Logger
+ pq priorityqueue.Queue
+ eh events.Handler
+
+ pipeline atomic.Value
+
+ // amqp connection
+ conn *amqp.Connection
+ consumeChan *amqp.Channel
+ publishChan chan *amqp.Channel
+ consumeID string
+ connStr string
+
+ retryTimeout time.Duration
+ //
+ // prefetch QoS AMQP
+ //
+ prefetch int
+ //
+ // pipeline's priority
+ //
+ priority int64
+ exchangeName string
+ queue string
+ exclusive bool
+ exchangeType string
+ routingKey string
+ multipleAck bool
+ requeueOnFail bool
+
+ delayCache map[string]struct{}
+
+ listeners uint32
+ stopCh chan struct{}
+}
+
+// NewAMQPConsumer initializes rabbitmq pipeline
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_amqp_consumer")
+ // we need to obtain two parts of the amqp information here.
+ // firs part - address to connect, it is located in the global section under the amqp pluginName
+ // second part - queues and other pipeline information
+ // if no such key - error
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
+ }
+
+ // PARSE CONFIGURATION START -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+ // PARSE CONFIGURATION END -------
+
+ jb := &JobConsumer{
+ log: log,
+ pq: pq,
+ eh: e,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ // TODO to config
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+ priority: pipeCfg.Priority,
+
+ publishChan: make(chan *amqp.Channel, 1),
+ routingKey: pipeCfg.RoutingKey,
+ queue: pipeCfg.Queue,
+ exchangeType: pipeCfg.ExchangeType,
+ exchangeName: pipeCfg.Exchange,
+ prefetch: pipeCfg.Prefetch,
+ exclusive: pipeCfg.Exclusive,
+ multipleAck: pipeCfg.MultipleAck,
+ requeueOnFail: pipeCfg.RequeueOnFail,
+ }
+
+ jb.conn, err = amqp.Dial(globalCfg.Addr)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save address
+ jb.connStr = globalCfg.Addr
+
+ err = jb.initRabbitMQ()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pch, err := jb.conn.Channel()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ jb.publishChan <- pch
+
+ // run redialer and requeue listener for the connection
+ jb.redialer()
+
+ return jb, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_amqp_consumer_from_pipeline")
+ // we need to obtain two parts of the amqp information here.
+ // firs part - address to connect, it is located in the global section under the amqp pluginName
+ // second part - queues and other pipeline information
+
+ // only global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // PARSE CONFIGURATION -------
+
+ jb := &JobConsumer{
+ log: log,
+ eh: e,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+
+ publishChan: make(chan *amqp.Channel, 1),
+ routingKey: pipeline.String(routingKey, ""),
+ queue: pipeline.String(queue, "default"),
+ exchangeType: pipeline.String(exchangeType, "direct"),
+ exchangeName: pipeline.String(exchangeKey, "amqp.default"),
+ prefetch: pipeline.Int(prefetch, 10),
+ priority: int64(pipeline.Int(priority, 10)),
+ exclusive: pipeline.Bool(exclusive, false),
+ multipleAck: pipeline.Bool(multipleAsk, false),
+ requeueOnFail: pipeline.Bool(requeueOnFail, false),
+ }
+
+ jb.conn, err = amqp.Dial(globalCfg.Addr)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save address
+ jb.connStr = globalCfg.Addr
+
+ err = jb.initRabbitMQ()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pch, err := jb.conn.Channel()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ jb.publishChan <- pch
+
+ // register the pipeline
+ // error here is always nil
+ _ = jb.Register(context.Background(), pipeline)
+
+ // run redialer for the connection
+ jb.redialer()
+
+ return jb, nil
+}
+
+func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
+ const op = errors.Op("rabbitmq_push")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != job.Options.Pipeline {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name()))
+ }
+
+ err := j.handleItem(ctx, fromJob(job))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+// handleItem
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("rabbitmq_handle_item")
+ select {
+ case pch := <-j.publishChan:
+ // return the channel back
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ // convert
+ table, err := pack(msg.ID(), msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ const op = errors.Op("amqp_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ // TODO declare separate method for this if condition
+ // TODO dlx cache channel??
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now().UTC(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ j.delayCache[tmpQ] = struct{}{}
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+
+func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ j.pipeline.Store(p)
+ return nil
+}
+
+func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_consume")
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
+ var err error
+ j.consumeChan, err = j.conn.Channel()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = j.consumeChan.Qos(j.prefetch, 0, false)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // run listener
+ j.listener(deliv)
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ return nil
+}
+
+func (j *JobConsumer) Pause(_ context.Context, p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
+ err := j.consumeChan.Cancel(j.consumeID, true)
+ if err != nil {
+ j.log.Error("cancel publish channel, forcing close", "error", err)
+ errCl := j.consumeChan.Close()
+ if errCl != nil {
+ j.log.Error("force close failed", "error", err)
+ return
+ }
+ return
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipePaused,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func (j *JobConsumer) Resume(_ context.Context, p string) {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested resume on: ", p)
+ }
+
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 1 {
+ j.log.Warn("amqp listener already in the active state")
+ return
+ }
+
+ var err error
+ j.consumeChan, err = j.conn.Channel()
+ if err != nil {
+ j.log.Error("create channel on rabbitmq connection", "error", err)
+ return
+ }
+
+ err = j.consumeChan.Qos(j.prefetch, 0, false)
+ if err != nil {
+ j.log.Error("qos set failed", "error", err)
+ return
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ j.log.Error("consume operation failed", "error", err)
+ return
+ }
+
+ // run listener
+ j.listener(deliv)
+
+ // increase number of listeners
+ atomic.AddUint32(&j.listeners, 1)
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func (j *JobConsumer) Stop(context.Context) error {
+ j.stopCh <- struct{}{}
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+ return nil
+}
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
new file mode 100644
index 00000000..5990d137
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -0,0 +1,228 @@
+package amqp
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ json "github.com/json-iterator/go"
+ amqp "github.com/rabbitmq/amqp091-go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // private
+ // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
+ ack func(multiply bool) error
+
+ // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
+ // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
+ // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
+ // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
+ nack func(multiply bool, requeue bool) error
+
+ // requeueFn used as a pointer to the push function
+ requeueFn func(context.Context, *Item) error
+
+ multipleAsk bool
+ requeue bool
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+// Not used in the amqp, amqp.Table used instead
+func (i *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (i *Item) Ack() error {
+ return i.Options.ack(i.Options.multipleAsk)
+}
+
+func (i *Item) Nack() error {
+ return i.Options.nack(false, i.Options.requeue)
+}
+
+// Requeue with the provided delay, handled by the Nack
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ i.Headers = headers
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ errNack := i.Options.nack(false, true)
+ if errNack != nil {
+ return fmt.Errorf("requeue error: %v\nack error: %v", err, errNack)
+ }
+
+ return err
+ }
+
+ // ack the job
+ err = i.Options.ack(false)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
+func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
+ const op = errors.Op("from_delivery_convert")
+ item, err := j.unpack(d)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ i := &Item{
+ Job: item.Job,
+ Ident: item.Ident,
+ Payload: item.Payload,
+ Headers: item.Headers,
+ Options: item.Options,
+ }
+
+ item.Options.ack = d.Ack
+ item.Options.nack = d.Nack
+
+ // requeue func
+ item.Options.requeueFn = j.handleItem
+ return i, nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Headers: job.Headers,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
+}
+
+// pack job metadata into headers
+func pack(id string, j *Item) (amqp.Table, error) {
+ headers, err := json.Marshal(j.Headers)
+ if err != nil {
+ return nil, err
+ }
+ return amqp.Table{
+ job.RRID: id,
+ job.RRJob: j.Job,
+ job.RRPipeline: j.Options.Pipeline,
+ job.RRHeaders: headers,
+ job.RRDelay: j.Options.Delay,
+ job.RRPriority: j.Options.Priority,
+ }, nil
+}
+
+// unpack restores jobs.Options
+func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
+ item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
+ multipleAsk: j.multipleAck,
+ requeue: j.requeueOnFail,
+ requeueFn: j.handleItem,
+ }}
+
+ if _, ok := d.Headers[job.RRID].(string); !ok {
+ return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID))
+ }
+
+ item.Ident = d.Headers[job.RRID].(string)
+
+ if _, ok := d.Headers[job.RRJob].(string); !ok {
+ return nil, errors.E(errors.Errorf("missing header `%s`", job.RRJob))
+ }
+
+ item.Job = d.Headers[job.RRJob].(string)
+
+ if _, ok := d.Headers[job.RRPipeline].(string); ok {
+ item.Options.Pipeline = d.Headers[job.RRPipeline].(string)
+ }
+
+ if h, ok := d.Headers[job.RRHeaders].([]byte); ok {
+ err := json.Unmarshal(h, &item.Headers)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ if _, ok := d.Headers[job.RRDelay].(int64); ok {
+ item.Options.Delay = d.Headers[job.RRDelay].(int64)
+ }
+
+ if _, ok := d.Headers[job.RRPriority]; !ok {
+ // set pipe's priority
+ item.Options.Priority = j.priority
+ } else {
+ item.Options.Priority = d.Headers[job.RRPriority].(int64)
+ }
+
+ return item, nil
+}
diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go
new file mode 100644
index 00000000..0b1cd2dc
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/listener.go
@@ -0,0 +1,25 @@
+package amqp
+
+import amqp "github.com/rabbitmq/amqp091-go"
+
+func (j *JobConsumer) listener(deliv <-chan amqp.Delivery) {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case msg, ok := <-deliv:
+ if !ok {
+ j.log.Info("delivery channel closed, leaving the rabbit listener")
+ return
+ }
+
+ d, err := j.fromDelivery(msg)
+ if err != nil {
+ j.log.Error("amqp delivery convert", "error", err)
+ continue
+ }
+ // insert job into the main priority queue
+ j.pq.Insert(d)
+ }
+ }
+ }()
+}
diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go
new file mode 100644
index 00000000..624f4405
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/plugin.go
@@ -0,0 +1,40 @@
+package amqp
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ pluginName string = "amqp"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
+}
+
+// FromPipeline constructs AMQP driver from pipeline
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipe, p.log, p.cfg, e, pq)
+}
diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/rabbit_init.go
new file mode 100644
index 00000000..570498e9
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/rabbit_init.go
@@ -0,0 +1,65 @@
+package amqp
+
+import (
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+)
+
+func (j *JobConsumer) initRabbitMQ() error {
+ const op = errors.Op("jobs_plugin_rmq_init")
+ // Channel opens a unique, concurrent server channel to process the bulk of AMQP
+ // messages. Any error from methods on this receiver will render the receiver
+ // invalid and a new Channel should be opened.
+ channel, err := j.conn.Channel()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // declare an exchange (idempotent operation)
+ err = channel.ExchangeDeclare(
+ j.exchangeName,
+ j.exchangeType,
+ true,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // verify or declare a queue
+ q, err := channel.QueueDeclare(
+ j.queue,
+ false,
+ false,
+ j.exclusive,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // bind queue to the exchange
+ err = channel.QueueBind(
+ q.Name,
+ j.routingKey,
+ j.exchangeName,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventInitialized,
+ Driver: "amqp",
+ Start: time.Now(),
+ })
+ return channel.Close()
+}
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
new file mode 100644
index 00000000..8dc18b8f
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -0,0 +1,141 @@
+package amqp
+
+import (
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ amqp "github.com/rabbitmq/amqp091-go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+)
+
+// redialer used to redial to the rabbitmq in case of the connection interrupts
+func (j *JobConsumer) redialer() { //nolint:gocognit
+ go func() {
+ const op = errors.Op("rabbitmq_redial")
+
+ for {
+ select {
+ case err := <-j.conn.NotifyClose(make(chan *amqp.Error)):
+ if err == nil {
+ return
+ }
+
+ j.Lock()
+
+ // trash the broken publishing channel
+ <-j.publishChan
+
+ t := time.Now()
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeError,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Error: err,
+ Start: time.Now(),
+ })
+
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = j.retryTimeout
+ operation := func() error {
+ j.log.Warn("rabbitmq reconnecting, caused by", "error", err)
+ var dialErr error
+ j.conn, dialErr = amqp.Dial(j.connStr)
+ if dialErr != nil {
+ return errors.E(op, dialErr)
+ }
+
+ j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
+
+ // re-init connection
+ errInit := j.initRabbitMQ()
+ if errInit != nil {
+ j.log.Error("rabbitmq dial", "error", errInit)
+ return errInit
+ }
+
+ // redeclare consume channel
+ var errConnCh error
+ j.consumeChan, errConnCh = j.conn.Channel()
+ if errConnCh != nil {
+ return errors.E(op, errConnCh)
+ }
+
+ // redeclare publish channel
+ pch, errPubCh := j.conn.Channel()
+ if errPubCh != nil {
+ return errors.E(op, errPubCh)
+ }
+
+ // start reading messages from the channel
+ deliv, err := j.consumeChan.Consume(
+ j.queue,
+ j.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // put the fresh publishing channel
+ j.publishChan <- pch
+ // restart listener
+ j.listener(deliv)
+
+ j.log.Info("queues and subscribers redeclared successfully")
+
+ return nil
+ }
+
+ retryErr := backoff.Retry(operation, expb)
+ if retryErr != nil {
+ j.Unlock()
+ j.log.Error("backoff failed", "error", retryErr)
+ return
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Start: t,
+ Elapsed: time.Since(t),
+ })
+
+ j.Unlock()
+
+ case <-j.stopCh:
+ if j.publishChan != nil {
+ pch := <-j.publishChan
+ err := pch.Close()
+ if err != nil {
+ j.log.Error("publish channel close", "error", err)
+ }
+ }
+
+ if j.consumeChan != nil {
+ err := j.consumeChan.Close()
+ if err != nil {
+ j.log.Error("consume channel close", "error", err)
+ }
+ }
+ if j.conn != nil {
+ err := j.conn.Close()
+ if err != nil {
+ j.log.Error("amqp connection close", "error", err)
+ }
+ }
+
+ return
+ }
+ }
+ }()
+}
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go
new file mode 100644
index 00000000..a8069f5d
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/config.go
@@ -0,0 +1,53 @@
+package beanstalk
+
+import (
+ "time"
+
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+const (
+ tubePriority string = "tube_priority"
+ tube string = "tube"
+ reserveTimeout string = "reserve_timeout"
+)
+
+type GlobalCfg struct {
+ Addr string `mapstructure:"addr"`
+ Timeout time.Duration `mapstructure:"timeout"`
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Addr == "" {
+ c.Addr = "tcp://127.0.0.1:11300"
+ }
+
+ if c.Timeout == 0 {
+ c.Timeout = time.Second * 30
+ }
+}
+
+type Config struct {
+ PipePriority int64 `mapstructure:"priority"`
+ TubePriority *uint32 `mapstructure:"tube_priority"`
+ Tube string `mapstructure:"tube"`
+ ReserveTimeout time.Duration `mapstructure:"reserve_timeout"`
+}
+
+func (c *Config) InitDefault() {
+ if c.Tube == "" {
+ c.Tube = "default"
+ }
+
+ if c.ReserveTimeout == 0 {
+ c.ReserveTimeout = time.Second * 1
+ }
+
+ if c.TubePriority == nil {
+ c.TubePriority = utils.Uint32(0)
+ }
+
+ if c.PipePriority == 0 {
+ c.PipePriority = 10
+ }
+}
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
new file mode 100644
index 00000000..32ca4188
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -0,0 +1,206 @@
+package beanstalk
+
+import (
+ "context"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/cenkalti/backoff/v4"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type ConnPool struct {
+ sync.RWMutex
+
+ log logger.Logger
+
+ conn *beanstalk.Conn
+ connT *beanstalk.Conn
+ ts *beanstalk.TubeSet
+ t *beanstalk.Tube
+
+ network string
+ address string
+ tName string
+ tout time.Duration
+}
+
+func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) {
+ connT, err := beanstalk.DialTimeout(network, address, tout)
+ if err != nil {
+ return nil, err
+ }
+
+ connTS, err := beanstalk.DialTimeout(network, address, tout)
+ if err != nil {
+ return nil, err
+ }
+
+ tube := beanstalk.NewTube(connT, tName)
+ ts := beanstalk.NewTubeSet(connTS, tName)
+
+ return &ConnPool{
+ log: log,
+ network: network,
+ address: address,
+ tName: tName,
+ tout: tout,
+ conn: connTS,
+ connT: connT,
+ ts: ts,
+ t: tube,
+ }, nil
+}
+
+// Put the payload
+// TODO use the context ??
+func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+
+ // TODO(rustatian): redial based on the token
+ id, err := cp.t.Put(body, pri, delay, ttr)
+ if err != nil {
+ // errN contains both, err and internal checkAndRedial error
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return 0, errN
+ } else {
+ // retry put only when we redialed
+ return cp.t.Put(body, pri, delay, ttr)
+ }
+ }
+
+ return id, nil
+}
+
+// Reserve reserves and returns a job from one of the tubes in t. If no
+// job is available before time timeout has passed, Reserve returns a
+// ConnError recording ErrTimeout.
+//
+// Typically, a client will reserve a job, perform some work, then delete
+// the job with Conn.Delete.
+func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+
+ id, body, err := cp.ts.Reserve(reserveTimeout)
+ if err != nil {
+ // errN contains both, err and internal checkAndRedial error
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return 0, nil, errN
+ } else {
+ // retry Reserve only when we redialed
+ return cp.ts.Reserve(reserveTimeout)
+ }
+ }
+
+ return id, body, nil
+}
+
+func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
+ cp.RLock()
+ defer cp.RUnlock()
+
+ err := cp.conn.Delete(id)
+ if err != nil {
+ // errN contains both, err and internal checkAndRedial error
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return errN
+ } else {
+ // retry Delete only when we redialed
+ return cp.conn.Delete(id)
+ }
+ }
+ return nil
+}
+
+func (cp *ConnPool) redial() error {
+ const op = errors.Op("connection_pool_redial")
+
+ cp.Lock()
+ // backoff here
+ expb := backoff.NewExponentialBackOff()
+ // TODO(rustatian) set via config
+ expb.MaxElapsedTime = time.Minute
+
+ operation := func() error {
+ connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+ if connT == nil {
+ return errors.E(op, errors.Str("connectionT is nil"))
+ }
+
+ connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+
+ if connTS == nil {
+ return errors.E(op, errors.Str("connectionTS is nil"))
+ }
+
+ cp.t = beanstalk.NewTube(connT, cp.tName)
+ cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
+ cp.conn = connTS
+ cp.connT = connT
+
+ cp.log.Info("beanstalk redial was successful")
+ return nil
+ }
+
+ retryErr := backoff.Retry(operation, expb)
+ if retryErr != nil {
+ cp.Unlock()
+ return retryErr
+ }
+ cp.Unlock()
+
+ return nil
+}
+
+var connErrors = map[string]struct{}{"EOF": {}}
+
+func (cp *ConnPool) checkAndRedial(err error) error {
+ const op = errors.Op("connection_pool_check_redial")
+ switch et := err.(type) { //nolint:gocritic
+ // check if the error
+ case beanstalk.ConnError:
+ switch bErr := et.Err.(type) {
+ case *net.OpError:
+ cp.RUnlock()
+ errR := cp.redial()
+ cp.RLock()
+ // if redial failed - return
+ if errR != nil {
+ return errors.E(op, errors.Errorf("%v:%v", bErr, errR))
+ }
+
+ // if redial was successful -> continue listening
+ return nil
+ default:
+ if _, ok := connErrors[et.Err.Error()]; ok {
+ // if error is related to the broken connection - redial
+ cp.RUnlock()
+ errR := cp.redial()
+ cp.RLock()
+ // if redial failed - return
+ if errR != nil {
+ return errors.E(op, errors.Errorf("%v:%v", err, errR))
+ }
+ // if redial was successful -> continue listening
+ return nil
+ }
+ }
+ }
+
+ // return initial error
+ return err
+}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
new file mode 100644
index 00000000..eaf99be1
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -0,0 +1,317 @@
+package beanstalk
+
+import (
+ "bytes"
+ "context"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type JobConsumer struct {
+ log logger.Logger
+ eh events.Handler
+ pq priorityqueue.Queue
+
+ pipeline atomic.Value
+ listeners uint32
+
+ // beanstalk
+ pool *ConnPool
+ addr string
+ network string
+ reserveTimeout time.Duration
+ reconnectCh chan struct{}
+ tout time.Duration
+ // tube name
+ tName string
+ tubePriority *uint32
+ priority int64
+
+ stopCh chan struct{}
+ requeueCh chan *Item
+}
+
+func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_beanstalk_consumer")
+
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // PARSE CONFIGURATION -------
+
+ dsn := strings.Split(globalCfg.Addr, "://")
+ if len(dsn) != 2 {
+ return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
+ }
+
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // initialize job consumer
+ jc := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ pool: cPool,
+ network: dsn[0],
+ addr: dsn[1],
+ tout: globalCfg.Timeout,
+ tName: pipeCfg.Tube,
+ reserveTimeout: pipeCfg.ReserveTimeout,
+ tubePriority: pipeCfg.TubePriority,
+ priority: pipeCfg.PipePriority,
+
+ // buffered with two because jobs root plugin can call Stop at the same time as Pause
+ stopCh: make(chan struct{}, 2),
+ requeueCh: make(chan *Item, 1000),
+ reconnectCh: make(chan struct{}, 2),
+ }
+
+ return jc, nil
+}
+
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_beanstalk_consumer")
+
+ // PARSE CONFIGURATION -------
+ var globalCfg GlobalCfg
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global beanstalk configuration, global configuration should contain beanstalk addrs and timeout"))
+ }
+
+ err := cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // PARSE CONFIGURATION -------
+
+ dsn := strings.Split(globalCfg.Addr, "://")
+ if len(dsn) != 2 {
+ return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://127.0.0.1:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
+ }
+
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // initialize job consumer
+ jc := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ pool: cPool,
+ network: dsn[0],
+ addr: dsn[1],
+ tout: globalCfg.Timeout,
+ tName: pipe.String(tube, "default"),
+ reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
+ tubePriority: utils.Uint32(uint32(pipe.Int(tubePriority, 1))),
+ priority: pipe.Priority(),
+
+ // buffered with two because jobs root plugin can call Stop at the same time as Pause
+ stopCh: make(chan struct{}, 2),
+ requeueCh: make(chan *Item, 1000),
+ reconnectCh: make(chan struct{}, 2),
+ }
+
+ return jc, nil
+}
+func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+ const op = errors.Op("beanstalk_push")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != jb.Options.Pipeline {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
+ }
+
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
+ const op = errors.Op("beanstalk_handle_item")
+
+ bb := new(bytes.Buffer)
+ bb.Grow(64)
+ err := item.pack(bb)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt#L458
+ // <pri> is an integer < 2**32. Jobs with smaller priority values will be
+ // scheduled before jobs with larger priorities. The most urgent priority is 0;
+ // the least urgent priority is 4,294,967,295.
+ //
+ // <delay> is an integer number of seconds to wait before putting the job in
+ // the ready queue. The job will be in the "delayed" state during this time.
+ // Maximum delay is 2**32-1.
+ //
+ // <ttr> -- time to run -- is an integer number of seconds to allow a worker
+ // to run this job. This time is counted from the moment a worker reserves
+ // this job. If the worker does not delete, release, or bury the job within
+ // <ttr> seconds, the job will time out and the server will release the job.
+ // The minimum ttr is 1. If the client sends 0, the server will silently
+ // increase the ttr to 1. Maximum ttr is 2**32-1.
+ id, err := j.pool.Put(ctx, bb.Bytes(), *j.tubePriority, item.Options.DelayDuration(), j.tout)
+ if err != nil {
+ errD := j.pool.Delete(ctx, id)
+ if errD != nil {
+ return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error()))
+ }
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error {
+ // register the pipeline
+ j.pipeline.Store(p)
+ return nil
+}
+
+func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("beanstalk_run")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name()))
+ }
+
+ atomic.AddUint32(&j.listeners, 1)
+
+ go j.listen()
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ return nil
+}
+
+func (j *JobConsumer) Stop(context.Context) error {
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+
+ if atomic.LoadUint32(&j.listeners) == 1 {
+ j.stopCh <- struct{}{}
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ return nil
+}
+
+func (j *JobConsumer) Pause(ctx context.Context, p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ j.stopCh <- struct{}{}
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipePaused,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func (j *JobConsumer) Resume(_ context.Context, p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 1 {
+ j.log.Warn("sqs listener already in the active state")
+ return
+ }
+
+ // start listener
+ go j.listen()
+
+ // increase num of listeners
+ atomic.AddUint32(&j.listeners, 1)
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/jobs/drivers/beanstalk/encode_test.go
new file mode 100644
index 00000000..e43207eb
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/encode_test.go
@@ -0,0 +1,75 @@
+package beanstalk
+
+import (
+ "bytes"
+ "crypto/rand"
+ "encoding/gob"
+ "testing"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+func BenchmarkEncodeGob(b *testing.B) {
+ tb := make([]byte, 1024*10)
+ _, err := rand.Read(tb)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ item := &Item{
+ Job: "/super/test/php/class/loooooong",
+ Ident: "12341234-asdfasdfa-1234234-asdfasdfas",
+ Payload: utils.AsString(tb),
+ Headers: map[string][]string{"Test": {"test1", "test2"}},
+ Options: &Options{
+ Priority: 10,
+ Pipeline: "test-local-pipe",
+ Delay: 10,
+ },
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ bb := new(bytes.Buffer)
+ err := gob.NewEncoder(bb).Encode(item)
+ if err != nil {
+ b.Fatal(err)
+ }
+ _ = bb.Bytes()
+ bb.Reset()
+ }
+}
+
+func BenchmarkEncodeJsonIter(b *testing.B) {
+ tb := make([]byte, 1024*10)
+ _, err := rand.Read(tb)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ item := &Item{
+ Job: "/super/test/php/class/loooooong",
+ Ident: "12341234-asdfasdfa-1234234-asdfasdfas",
+ Payload: utils.AsString(tb),
+ Headers: map[string][]string{"Test": {"test1", "test2"}},
+ Options: &Options{
+ Priority: 10,
+ Pipeline: "test-local-pipe",
+ Delay: 10,
+ },
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ bb, err := json.Marshal(item)
+ if err != nil {
+ b.Fatal(err)
+ }
+ _ = bb
+ }
+}
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
new file mode 100644
index 00000000..f1d7ac76
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -0,0 +1,147 @@
+package beanstalk
+
+import (
+ "bytes"
+ "context"
+ "encoding/gob"
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // Private ================
+ id uint64
+ conn *beanstalk.Conn
+ requeueFn func(context.Context, *Item) error
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+// Not used in the sqs, MessageAttributes used instead
+func (i *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (i *Item) Ack() error {
+ return i.Options.conn.Delete(i.Options.id)
+}
+
+func (i *Item) Nack() error {
+ return i.Options.conn.Delete(i.Options.id)
+}
+
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ i.Headers = headers
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
+ }
+
+ // delete old job
+ err = i.Options.conn.Delete(i.Options.id)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Headers: job.Headers,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
+}
+
+func (i *Item) pack(b *bytes.Buffer) error {
+ err := gob.NewEncoder(b).Encode(i)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error {
+ err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out)
+ if err != nil {
+ return err
+ }
+ out.Options.conn = j.pool.conn
+ out.Options.id = id
+ out.Options.requeueFn = j.handleItem
+
+ return nil
+}
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
new file mode 100644
index 00000000..f1385e70
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -0,0 +1,39 @@
+package beanstalk
+
+import (
+ "github.com/beanstalkd/go-beanstalk"
+)
+
+func (j *JobConsumer) listen() {
+ for {
+ select {
+ case <-j.stopCh:
+ j.log.Warn("beanstalk listener stopped")
+ return
+ default:
+ id, body, err := j.pool.Reserve(j.reserveTimeout)
+ if err != nil {
+ if errB, ok := err.(beanstalk.ConnError); ok {
+ switch errB.Err { //nolint:gocritic
+ case beanstalk.ErrTimeout:
+ j.log.Info("beanstalk reserve timeout", "warn", errB.Op)
+ continue
+ }
+ }
+ // in case of other error - continue
+ j.log.Error("beanstalk reserve", "error", err)
+ continue
+ }
+
+ item := &Item{}
+ err = j.unpack(id, body, item)
+ if err != nil {
+ j.log.Error("beanstalk unpack item", "error", err)
+ continue
+ }
+
+ // insert job into the priority queue
+ j.pq.Insert(item)
+ }
+ }
+}
diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/jobs/drivers/beanstalk/plugin.go
new file mode 100644
index 00000000..529d1474
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/plugin.go
@@ -0,0 +1,47 @@
+package beanstalk
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ pluginName string = "beanstalk"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ return make(chan error)
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) JobsConstruct(configKey string, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewBeanstalkConsumer(configKey, p.log, p.cfg, eh, pq)
+}
+
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, eh events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipe, p.log, p.cfg, eh, pq)
+}
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
new file mode 100644
index 00000000..95ad6ecd
--- /dev/null
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -0,0 +1,244 @@
+package ephemeral
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ prefetch string = "prefetch"
+ goroutinesMax uint64 = 1000
+)
+
+type Config struct {
+ Prefetch uint64 `mapstructure:"prefetch"`
+}
+
+type JobConsumer struct {
+ cfg *Config
+ log logger.Logger
+ eh events.Handler
+ pipeline sync.Map
+ pq priorityqueue.Queue
+ localPrefetch chan *Item
+
+ // time.sleep goroutines max number
+ goroutines uint64
+
+ stopCh chan struct{}
+}
+
+func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_ephemeral_pipeline")
+
+ jb := &JobConsumer{
+ log: log,
+ pq: pq,
+ eh: eh,
+ goroutines: 0,
+ stopCh: make(chan struct{}, 1),
+ }
+
+ err := cfg.UnmarshalKey(configKey, &jb.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ if jb.cfg.Prefetch == 0 {
+ jb.cfg.Prefetch = 100_000
+ }
+
+ // initialize a local queue
+ jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch)
+
+ // consume from the queue
+ go jb.consume()
+
+ return jb, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ jb := &JobConsumer{
+ log: log,
+ pq: pq,
+ eh: eh,
+ goroutines: 0,
+ stopCh: make(chan struct{}, 1),
+ }
+
+ // initialize a local queue
+ jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000))
+
+ // consume from the queue
+ go jb.consume()
+
+ return jb, nil
+}
+
+func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+ const op = errors.Op("ephemeral_push")
+
+ // check if the pipeline registered
+ b, ok := j.pipeline.Load(jb.Options.Pipeline)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
+ }
+
+ if !b.(bool) {
+ return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
+ }
+
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("ephemeral_handle_request")
+ // handle timeouts
+ // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
+ // goroutines here. We should limit goroutines here.
+ if msg.Options.Delay > 0 {
+ // if we have 1000 goroutines waiting on the delay - reject 1001
+ if atomic.LoadUint64(&j.goroutines) >= goroutinesMax {
+ return errors.E(op, errors.Str("max concurrency number reached"))
+ }
+
+ go func(jj *Item) {
+ atomic.AddUint64(&j.goroutines, 1)
+ time.Sleep(jj.Options.DelayDuration())
+
+ // send the item after timeout expired
+ j.localPrefetch <- jj
+
+ atomic.AddUint64(&j.goroutines, ^uint64(0))
+ }(msg)
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ select {
+ case j.localPrefetch <- msg:
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ }
+}
+
+func (j *JobConsumer) consume() {
+ // redirect
+ for {
+ select {
+ case item, ok := <-j.localPrefetch:
+ if !ok {
+ j.log.Warn("ephemeral local prefetch queue was closed")
+ return
+ }
+
+ // set requeue channel
+ item.Options.requeueFn = j.handleItem
+
+ j.pq.Insert(item)
+ case <-j.stopCh:
+ return
+ }
+ }
+}
+
+func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("ephemeral_register")
+ if _, ok := j.pipeline.Load(pipeline.Name()); ok {
+ return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
+ }
+
+ j.pipeline.Store(pipeline.Name(), true)
+
+ return nil
+}
+
+func (j *JobConsumer) Pause(_ context.Context, pipeline string) {
+ if q, ok := j.pipeline.Load(pipeline); ok {
+ if q == true {
+ // mark pipeline as turned off
+ j.pipeline.Store(pipeline, false)
+ }
+ // if not true - do not send the EventPipeStopped, because pipe already stopped
+ return
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipePaused,
+ Pipeline: pipeline,
+ Start: time.Now(),
+ Elapsed: 0,
+ })
+}
+
+func (j *JobConsumer) Resume(_ context.Context, pipeline string) {
+ if q, ok := j.pipeline.Load(pipeline); ok {
+ if q == false {
+ // mark pipeline as turned on
+ j.pipeline.Store(pipeline, true)
+ }
+
+ // if not true - do not send the EventPipeActive, because pipe already active
+ return
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Pipeline: pipeline,
+ Start: time.Now(),
+ Elapsed: 0,
+ })
+}
+
+// Run is no-op for the ephemeral
+func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+ return nil
+}
+
+func (j *JobConsumer) Stop(ctx context.Context) error {
+ const op = errors.Op("ephemeral_plugin_stop")
+ var pipe string
+ j.pipeline.Range(func(key, _ interface{}) bool {
+ pipe = key.(string)
+ j.pipeline.Delete(key)
+ return true
+ })
+
+ select {
+ // return from the consumer
+ case j.stopCh <- struct{}{}:
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Pipeline: pipe,
+ Start: time.Now(),
+ Elapsed: 0,
+ })
+ return nil
+
+ case <-ctx.Done():
+ return errors.E(op, ctx.Err())
+ }
+}
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
new file mode 100644
index 00000000..1a61d7e9
--- /dev/null
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -0,0 +1,115 @@
+package ephemeral
+
+import (
+ "context"
+ "time"
+
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type Item struct {
+ // Job contains name of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // private
+ requeueFn func(context.Context, *Item) error
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+func (i *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (i *Item) Ack() error {
+ // noop for the in-memory
+ return nil
+}
+
+func (i *Item) Nack() error {
+ // noop for the in-memory
+ return nil
+}
+
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ i.Headers = headers
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
+}
diff --git a/plugins/jobs/drivers/ephemeral/plugin.go b/plugins/jobs/drivers/ephemeral/plugin.go
new file mode 100644
index 00000000..28495abb
--- /dev/null
+++ b/plugins/jobs/drivers/ephemeral/plugin.go
@@ -0,0 +1,41 @@
+package ephemeral
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "ephemeral"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
+
+// JobsConstruct creates new ephemeral consumer from the configuration
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewJobBroker(configKey, p.log, p.cfg, e, pq)
+}
+
+// FromPipeline creates new ephemeral consumer from the provided pipeline
+func (p *Plugin) FromPipeline(pipeline *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipeline, p.log, e, pq)
+}
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go
new file mode 100644
index 00000000..9b2a1ca8
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/config.go
@@ -0,0 +1,114 @@
+package sqs
+
+import "github.com/aws/aws-sdk-go-v2/aws"
+
+const (
+ attributes string = "attributes"
+ tags string = "tags"
+ queue string = "queue"
+ pref string = "prefetch"
+ visibility string = "visibility_timeout"
+ waitTime string = "wait_time"
+)
+
+type GlobalCfg struct {
+ Key string `mapstructure:"key"`
+ Secret string `mapstructure:"secret"`
+ Region string `mapstructure:"region"`
+ SessionToken string `mapstructure:"session_token"`
+ Endpoint string `mapstructure:"endpoint"`
+}
+
+// Config is used to parse pipeline configuration
+type Config struct {
+ // The duration (in seconds) that the received messages are hidden from subsequent
+ // retrieve requests after being retrieved by a ReceiveMessage request.
+ VisibilityTimeout int32 `mapstructure:"visibility_timeout"`
+ // The duration (in seconds) for which the call waits for a message to arrive
+ // in the queue before returning. If a message is available, the call returns
+ // sooner than WaitTimeSeconds. If no messages are available and the wait time
+ // expires, the call returns successfully with an empty list of messages.
+ WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"`
+ // Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages
+ // than this value (however, fewer messages might be returned). Valid values: 1 to
+ // 10. Default: 1.
+ Prefetch int32 `mapstructure:"prefetch"`
+ // The name of the new queue. The following limits apply to this name:
+ //
+ // * A queue
+ // name can have up to 80 characters.
+ //
+ // * Valid values: alphanumeric characters,
+ // hyphens (-), and underscores (_).
+ //
+ // * A FIFO queue name must end with the .fifo
+ // suffix.
+ //
+ // Queue URLs and names are case-sensitive.
+ //
+ // This member is required.
+ Queue *string `mapstructure:"queue"`
+
+ // A map of attributes with their corresponding values. The following lists the
+ // names, descriptions, and values of the special request parameters that the
+ // CreateQueue action uses.
+ // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html
+ Attributes map[string]string `mapstructure:"attributes"`
+
+ // From amazon docs:
+ // Add cost allocation tags to the specified Amazon SQS queue. For an overview, see
+ // Tagging Your Amazon SQS Queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-tags.html)
+ // in the Amazon SQS Developer Guide. When you use queue tags, keep the following
+ // guidelines in mind:
+ //
+ // * Adding more than 50 tags to a queue isn't recommended.
+ //
+ // *
+ // Tags don't have any semantic meaning. Amazon SQS interprets tags as character
+ // strings.
+ //
+ // * Tags are case-sensitive.
+ //
+ // * A new tag with a key identical to that
+ // of an existing tag overwrites the existing tag.
+ //
+ // For a full list of tag
+ // restrictions, see Quotas related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-queues)
+ // in the Amazon SQS Developer Guide. To be able to tag a queue on creation, you
+ // must have the sqs:CreateQueue and sqs:TagQueue permissions. Cross-account
+ // permissions don't apply to this action. For more information, see Grant
+ // cross-account permissions to a role and a user name
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-customer-managed-policy-examples.html#grant-cross-account-permissions-to-role-and-user-name)
+ // in the Amazon SQS Developer Guide.
+ Tags map[string]string `mapstructure:"tags"`
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Endpoint == "" {
+ c.Endpoint = "http://127.0.0.1:9324"
+ }
+}
+
+func (c *Config) InitDefault() {
+ if c.Queue == nil {
+ c.Queue = aws.String("default")
+ }
+
+ if c.Prefetch == 0 || c.Prefetch > 10 {
+ c.Prefetch = 10
+ }
+
+ if c.WaitTimeSeconds == 0 {
+ c.WaitTimeSeconds = 5
+ }
+
+ if c.Attributes == nil {
+ c.Attributes = make(map[string]string)
+ }
+
+ if c.Tags == nil {
+ c.Tags = make(map[string]string)
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
new file mode 100644
index 00000000..9ce37543
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -0,0 +1,376 @@
+package sqs
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/aws/retry"
+ "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/credentials"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/google/uuid"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ cfgPlugin "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type JobConsumer struct {
+ sync.Mutex
+ pq priorityqueue.Queue
+ log logger.Logger
+ eh events.Handler
+ pipeline atomic.Value
+
+ // connection info
+ key string
+ secret string
+ sessionToken string
+ region string
+ endpoint string
+ queue *string
+ messageGroupID string
+ waitTime int32
+ prefetch int32
+ visibilityTimeout int32
+
+ // if user invoke several resume operations
+ listeners uint32
+
+ // queue optional parameters
+ attributes map[string]string
+ tags map[string]string
+
+ client *sqs.Client
+ queueURL *string
+
+ pauseCh chan struct{}
+}
+
+func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_sqs_consumer")
+
+ // if no such key - error
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // initialize job consumer
+ jb := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ messageGroupID: uuid.NewString(),
+ attributes: pipeCfg.Attributes,
+ tags: pipeCfg.Tags,
+ queue: pipeCfg.Queue,
+ prefetch: pipeCfg.Prefetch,
+ visibilityTimeout: pipeCfg.VisibilityTimeout,
+ waitTime: pipeCfg.WaitTimeSeconds,
+ region: globalCfg.Region,
+ key: globalCfg.Key,
+ sessionToken: globalCfg.SessionToken,
+ secret: globalCfg.Secret,
+ endpoint: globalCfg.Endpoint,
+ pauseCh: make(chan struct{}, 1),
+ }
+
+ // PARSE CONFIGURATION -------
+
+ awsConf, err := config.LoadDefaultConfig(context.Background(),
+ config.WithRegion(globalCfg.Region),
+ config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken)))
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // config with retries
+ jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) {
+ o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
+ opts.MaxAttempts = 60
+ })
+ })
+
+ out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // assign a queue URL
+ jb.queueURL = out.QueueUrl
+
+ // To successfully create a new queue, you must provide a
+ // queue name that adheres to the limits related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
+ // and is unique within the scope of your queues. After you create a queue, you
+ // must wait at least one second after the queue is created to be able to use the <------------
+ // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
+ time.Sleep(time.Second * 2)
+
+ return jb, nil
+}
+
+func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+ const op = errors.Op("new_sqs_consumer")
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ res := make(map[string]interface{})
+ pipe.Map(attributes, res)
+
+ attr := make(map[string]string)
+ // accept only string values
+ for i := range res {
+ if v, ok := res[i].(string); ok {
+ attr[i] = v
+ }
+ }
+
+ // delete all values with map.clear to reuse for the tags
+ for k := range res {
+ delete(res, k)
+ }
+
+ pipe.Map(tags, res)
+
+ tg := make(map[string]string)
+ // accept only string values
+ for i := range res {
+ if v, ok := res[i].(string); ok {
+ attr[i] = v
+ }
+ }
+
+ // initialize job consumer
+ jb := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ messageGroupID: uuid.NewString(),
+ attributes: attr,
+ tags: tg,
+ queue: aws.String(pipe.String(queue, "default")),
+ prefetch: int32(pipe.Int(pref, 10)),
+ visibilityTimeout: int32(pipe.Int(visibility, 0)),
+ waitTime: int32(pipe.Int(waitTime, 0)),
+ region: globalCfg.Region,
+ key: globalCfg.Key,
+ sessionToken: globalCfg.SessionToken,
+ secret: globalCfg.Secret,
+ endpoint: globalCfg.Endpoint,
+ pauseCh: make(chan struct{}, 1),
+ }
+
+ // PARSE CONFIGURATION -------
+
+ awsConf, err := config.LoadDefaultConfig(context.Background(),
+ config.WithRegion(globalCfg.Region),
+ config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(jb.key, jb.secret, jb.sessionToken)))
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // config with retries
+ jb.client = sqs.NewFromConfig(awsConf, sqs.WithEndpointResolver(sqs.EndpointResolverFromURL(jb.endpoint)), func(o *sqs.Options) {
+ o.Retryer = retry.NewStandard(func(opts *retry.StandardOptions) {
+ opts.MaxAttempts = 60
+ })
+ })
+
+ out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // assign a queue URL
+ jb.queueURL = out.QueueUrl
+
+ // To successfully create a new queue, you must provide a
+ // queue name that adheres to the limits related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
+ // and is unique within the scope of your queues. After you create a queue, you
+ // must wait at least one second after the queue is created to be able to use the <------------
+ // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
+ time.Sleep(time.Second * 2)
+
+ return jb, nil
+}
+
+func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+ const op = errors.Op("sqs_push")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != jb.Options.Pipeline {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
+ }
+
+ // The length of time, in seconds, for which to delay a specific message. Valid
+ // values: 0 to 900. Maximum: 15 minutes.
+ if jb.Options.Delay > 900 {
+ return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
+ }
+
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ d, err := msg.pack(j.queueURL)
+ if err != nil {
+ return err
+ }
+ _, err = j.client.SendMessage(ctx, d)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ j.pipeline.Store(p)
+ return nil
+}
+
+func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_consume")
+
+ j.Lock()
+ defer j.Unlock()
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+
+ atomic.AddUint32(&j.listeners, 1)
+
+ // start listener
+ go j.listen(context.Background())
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ return nil
+}
+
+func (j *JobConsumer) Stop(context.Context) error {
+ j.pauseCh <- struct{}{}
+
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+ return nil
+}
+
+func (j *JobConsumer) Pause(_ context.Context, p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 0 {
+ j.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&j.listeners, ^uint32(0))
+
+ // stop consume
+ j.pauseCh <- struct{}{}
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipePaused,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func (j *JobConsumer) Resume(_ context.Context, p string) {
+ // load atomic value
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ j.log.Error("no such pipeline", "requested", p, "actual", pipe.Name())
+ return
+ }
+
+ l := atomic.LoadUint32(&j.listeners)
+ // no active listeners
+ if l == 1 {
+ j.log.Warn("sqs listener already in the active state")
+ return
+ }
+
+ // start listener
+ go j.listen(context.Background())
+
+ // increase num of listeners
+ atomic.AddUint32(&j.listeners, 1)
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
new file mode 100644
index 00000000..df72b2e5
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -0,0 +1,247 @@
+package sqs
+
+import (
+ "context"
+ "strconv"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+const (
+ StringType string = "String"
+ NumberType string = "Number"
+ BinaryType string = "Binary"
+ ApproximateReceiveCount string = "ApproximateReceiveCount"
+)
+
+var itemAttributes = []string{
+ job.RRJob,
+ job.RRDelay,
+ job.RRPriority,
+ job.RRHeaders,
+}
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // Private ================
+ approxReceiveCount int64
+ queue *string
+ receiptHandler *string
+ client *sqs.Client
+ requeueFn func(context.Context, *Item) error
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+// Not used in the sqs, MessageAttributes used instead
+func (i *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (i *Item) Ack() error {
+ _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Nack() error {
+ // requeue message
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
+ }
+
+ _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ i.Headers = headers
+
+ // requeue message
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ return err
+ }
+
+ // Delete job from the queue only after successful requeue
+ _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Headers: job.Headers,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
+}
+
+func (i *Item) pack(queue *string) (*sqs.SendMessageInput, error) {
+ // pack headers map
+ data, err := json.Marshal(i.Headers)
+ if err != nil {
+ return nil, err
+ }
+
+ return &sqs.SendMessageInput{
+ MessageBody: aws.String(i.Payload),
+ QueueUrl: queue,
+ DelaySeconds: int32(i.Options.Delay),
+ MessageAttributes: map[string]types.MessageAttributeValue{
+ job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
+ job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
+ job.RRHeaders: {DataType: aws.String(BinaryType), BinaryValue: data, BinaryListValues: nil, StringListValues: nil, StringValue: nil},
+ job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
+ },
+ }, nil
+}
+
+func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
+ const op = errors.Op("sqs_unpack")
+ // reserved
+ if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
+ return nil, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute"))
+ }
+
+ for i := 0; i < len(itemAttributes); i++ {
+ if _, ok := msg.MessageAttributes[itemAttributes[i]]; !ok {
+ return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", itemAttributes[i]))
+ }
+ }
+
+ var h map[string][]string
+ err := json.Unmarshal(msg.MessageAttributes[job.RRHeaders].BinaryValue, &h)
+ if err != nil {
+ return nil, err
+ }
+
+ delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount])
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ item := &Item{
+ Job: *msg.MessageAttributes[job.RRJob].StringValue,
+ Payload: *msg.Body,
+ Headers: h,
+ Options: &Options{
+ Delay: int64(delay),
+ Priority: int64(priority),
+
+ // private
+ approxReceiveCount: int64(recCount),
+ client: j.client,
+ queue: j.queueURL,
+ receiptHandler: msg.ReceiptHandle,
+ requeueFn: j.handleItem,
+ },
+ }
+
+ return item, nil
+}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
new file mode 100644
index 00000000..9efef90d
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -0,0 +1,87 @@
+package sqs
+
+import (
+ "context"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws/transport/http"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ "github.com/aws/smithy-go"
+)
+
+const (
+ // All - get all message attribute names
+ All string = "All"
+
+ // NonExistentQueue AWS error code
+ NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
+)
+
+func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
+ for {
+ select {
+ case <-j.pauseCh:
+ j.log.Warn("sqs listener stopped")
+ return
+ default:
+ message, err := j.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
+ QueueUrl: j.queueURL,
+ MaxNumberOfMessages: j.prefetch,
+ AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
+ MessageAttributeNames: []string{All},
+ // The new value for the message's visibility timeout (in seconds). Values range: 0
+ // to 43200. Maximum: 12 hours.
+ VisibilityTimeout: j.visibilityTimeout,
+ WaitTimeSeconds: j.waitTime,
+ })
+
+ if err != nil {
+ if oErr, ok := (err).(*smithy.OperationError); ok {
+ if rErr, ok := oErr.Err.(*http.ResponseError); ok {
+ if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok {
+ // in case of NonExistentQueue - recreate the queue
+ if apiErr.Code == NonExistentQueue {
+ j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
+ _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags})
+ if err != nil {
+ j.log.Error("create queue", "error", err)
+ }
+ // To successfully create a new queue, you must provide a
+ // queue name that adheres to the limits related to the queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
+ // and is unique within the scope of your queues. After you create a queue, you
+ // must wait at least one second after the queue is created to be able to use the <------------
+ // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
+ time.Sleep(time.Second * 2)
+ continue
+ }
+ }
+ }
+ }
+
+ j.log.Error("receive message", "error", err)
+ continue
+ }
+
+ for i := 0; i < len(message.Messages); i++ {
+ m := message.Messages[i]
+ item, err := j.unpack(&m)
+ if err != nil {
+ _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: j.queueURL,
+ ReceiptHandle: m.ReceiptHandle,
+ })
+ if errD != nil {
+ j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
+ }
+
+ j.log.Error("message unpack", "error", err)
+ continue
+ }
+
+ j.pq.Insert(item)
+ }
+ }
+ }
+}
diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/jobs/drivers/sqs/plugin.go
new file mode 100644
index 00000000..54f61ff5
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/plugin.go
@@ -0,0 +1,39 @@
+package sqs
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ pluginName string = "sqs"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return NewSQSConsumer(configKey, p.log, p.cfg, e, pq)
+}
+
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return FromPipeline(pipe, p.log, p.cfg, e, pq)
+}
diff --git a/plugins/jobs/job/general.go b/plugins/jobs/job/general.go
new file mode 100644
index 00000000..390f44b5
--- /dev/null
+++ b/plugins/jobs/job/general.go
@@ -0,0 +1,29 @@
+package job
+
+// constant keys to pack/unpack messages from different drivers
+const (
+ RRID string = "rr_id"
+ RRJob string = "rr_job"
+ RRHeaders string = "rr_headers"
+ RRPipeline string = "rr_pipeline"
+ RRDelay string = "rr_delay"
+ RRPriority string = "rr_priority"
+)
+
+// Job carries information about single job.
+type Job struct {
+ // Job contains name of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-value pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
diff --git a/plugins/jobs/job/job_options.go b/plugins/jobs/job/job_options.go
new file mode 100644
index 00000000..b7e4ed36
--- /dev/null
+++ b/plugins/jobs/job/job_options.go
@@ -0,0 +1,32 @@
+package job
+
+import "time"
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+}
+
+// Merge merges job options.
+func (o *Options) Merge(from *Options) {
+ if o.Pipeline == "" {
+ o.Pipeline = from.Pipeline
+ }
+
+ if o.Delay == 0 {
+ o.Delay = from.Delay
+ }
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_options_test.go
new file mode 100644
index 00000000..a47151a3
--- /dev/null
+++ b/plugins/jobs/job/job_options_test.go
@@ -0,0 +1,45 @@
+package job
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestOptions_DelayDuration(t *testing.T) {
+ opts := &Options{Delay: 0}
+ assert.Equal(t, time.Duration(0), opts.DelayDuration())
+}
+
+func TestOptions_DelayDuration2(t *testing.T) {
+ opts := &Options{Delay: 1}
+ assert.Equal(t, time.Second, opts.DelayDuration())
+}
+
+func TestOptions_Merge(t *testing.T) {
+ opts := &Options{}
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ })
+
+ assert.Equal(t, "pipeline", opts.Pipeline)
+ assert.Equal(t, int64(2), opts.Delay)
+}
+
+func TestOptions_MergeKeepOriginal(t *testing.T) {
+ opts := &Options{
+ Pipeline: "default",
+ Delay: 10,
+ }
+
+ opts.Merge(&Options{
+ Pipeline: "pipeline",
+ Delay: 2,
+ })
+
+ assert.Equal(t, "default", opts.Pipeline)
+ assert.Equal(t, int64(10), opts.Delay)
+}
diff --git a/plugins/jobs/pipeline/pipeline.go b/plugins/jobs/pipeline/pipeline.go
new file mode 100644
index 00000000..2f4671d3
--- /dev/null
+++ b/plugins/jobs/pipeline/pipeline.go
@@ -0,0 +1,90 @@
+package pipeline
+
+// Pipeline defines pipeline options.
+type Pipeline map[string]interface{}
+
+const (
+ priority string = "priority"
+ driver string = "driver"
+ name string = "name"
+)
+
+// With pipeline value
+func (p *Pipeline) With(name string, value interface{}) {
+ (*p)[name] = value
+}
+
+// Name returns pipeline name.
+func (p Pipeline) Name() string {
+ return p.String(name, "")
+}
+
+// Driver associated with the pipeline.
+func (p Pipeline) Driver() string {
+ return p.String(driver, "")
+}
+
+// Has checks if value presented in pipeline.
+func (p Pipeline) Has(name string) bool {
+ if _, ok := p[name]; ok {
+ return true
+ }
+
+ return false
+}
+
+// String must return option value as string or return default value.
+func (p Pipeline) String(name string, d string) string {
+ if value, ok := p[name]; ok {
+ if str, ok := value.(string); ok {
+ return str
+ }
+ }
+
+ return d
+}
+
+// Int must return option value as string or return default value.
+func (p Pipeline) Int(name string, d int) int {
+ if value, ok := p[name]; ok {
+ if i, ok := value.(int); ok {
+ return i
+ }
+ }
+
+ return d
+}
+
+// Bool must return option value as bool or return default value.
+func (p Pipeline) Bool(name string, d bool) bool {
+ if value, ok := p[name]; ok {
+ if i, ok := value.(bool); ok {
+ return i
+ }
+ }
+
+ return d
+}
+
+// Map must return nested map value or empty config.
+// Here might be sqs attributes or tags for example
+func (p Pipeline) Map(name string, out map[string]interface{}) {
+ if value, ok := p[name]; ok {
+ if m, ok := value.(map[string]interface{}); ok {
+ for k, v := range m {
+ out[k] = v
+ }
+ }
+ }
+}
+
+// Priority returns default pipeline priority
+func (p Pipeline) Priority() int64 {
+ if value, ok := p[priority]; ok {
+ if v, ok := value.(int64); ok {
+ return v
+ }
+ }
+
+ return 10
+}
diff --git a/plugins/jobs/pipeline/pipeline_test.go b/plugins/jobs/pipeline/pipeline_test.go
new file mode 100644
index 00000000..4482c70d
--- /dev/null
+++ b/plugins/jobs/pipeline/pipeline_test.go
@@ -0,0 +1,21 @@
+package pipeline
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestPipeline_String(t *testing.T) {
+ pipe := Pipeline{"value": "value"}
+
+ assert.Equal(t, "value", pipe.String("value", ""))
+ assert.Equal(t, "value", pipe.String("other", "value"))
+}
+
+func TestPipeline_Has(t *testing.T) {
+ pipe := Pipeline{"options": map[string]interface{}{"ttl": 10}}
+
+ assert.Equal(t, true, pipe.Has("options"))
+ assert.Equal(t, false, pipe.Has("other"))
+}
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
new file mode 100644
index 00000000..26015516
--- /dev/null
+++ b/plugins/jobs/plugin.go
@@ -0,0 +1,573 @@
+package jobs
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/pool"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+)
+
+const (
+ // RrMode env variable
+ RrMode string = "RR_MODE"
+ RrModeJobs string = "jobs"
+
+ PluginName string = "jobs"
+ pipelines string = "pipelines"
+)
+
+type Plugin struct {
+ sync.RWMutex
+
+ // Jobs plugin configuration
+ cfg *Config `structure:"jobs"`
+ log logger.Logger
+ workersPool pool.Pool
+ server server.Server
+
+ jobConstructors map[string]jobs.Constructor
+ consumers map[string]jobs.Consumer
+
+ // events handler
+ events events.Handler
+
+ // priority queue implementation
+ queue priorityqueue.Queue
+
+ // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline
+ pipelines sync.Map
+
+ // initial set of the pipelines to consume
+ consume map[string]struct{}
+
+ // signal channel to stop the pollers
+ stopCh chan struct{}
+
+ // internal payloads pool
+ pldPool sync.Pool
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+ const op = errors.Op("jobs_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.cfg.InitDefaults()
+
+ p.server = server
+
+ p.events = events.NewEventsHandler()
+ p.events.AddListener(p.collectJobsEvents)
+
+ p.jobConstructors = make(map[string]jobs.Constructor)
+ p.consumers = make(map[string]jobs.Consumer)
+ p.consume = make(map[string]struct{})
+ p.stopCh = make(chan struct{}, 1)
+
+ p.pldPool = sync.Pool{New: func() interface{} {
+ // with nil fields
+ return &payload.Payload{}
+ }}
+
+ // initial set of pipelines
+ for i := range p.cfg.Pipelines {
+ p.pipelines.Store(i, p.cfg.Pipelines[i])
+ }
+
+ if len(p.cfg.Consume) > 0 {
+ for i := 0; i < len(p.cfg.Consume); i++ {
+ p.consume[p.cfg.Consume[i]] = struct{}{}
+ }
+ }
+
+ // initialize priority queue
+ p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize)
+ p.log = log
+
+ return nil
+}
+
+func (p *Plugin) Serve() chan error { //nolint:gocognit
+ errCh := make(chan error, 1)
+ const op = errors.Op("jobs_plugin_serve")
+
+ // register initial pipelines
+ p.pipelines.Range(func(key, value interface{}) bool {
+ t := time.Now()
+ // pipeline name (ie test-local, sqs-aws, etc)
+ name := key.(string)
+
+ // pipeline associated with the name
+ pipe := value.(*pipeline.Pipeline)
+ // driver for the pipeline (ie amqp, ephemeral, etc)
+ dr := pipe.Driver()
+
+ // jobConstructors contains constructors for the drivers
+ // we need here to initialize these drivers for the pipelines
+ if c, ok := p.jobConstructors[dr]; ok {
+ // config key for the particular sub-driver jobs.pipelines.test-local
+ configKey := fmt.Sprintf("%s.%s.%s", PluginName, pipelines, name)
+
+ // init the driver
+ initializedDriver, err := c.JobsConstruct(configKey, p.events, p.queue)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return false
+ }
+
+ // add driver to the set of the consumers (name - pipeline name, value - associated driver)
+ p.consumers[name] = initializedDriver
+
+ // register pipeline for the initialized driver
+ err = initializedDriver.Register(context.Background(), pipe)
+ if err != nil {
+ errCh <- errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipe.Driver(), pipe.Name()))
+ return false
+ }
+
+ // if pipeline initialized to be consumed, call Run on it
+ if _, ok := p.consume[name]; ok {
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
+ err = initializedDriver.Run(ctx, pipe)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return false
+ }
+ return true
+ }
+
+ return true
+ }
+
+ p.events.Push(events.JobEvent{
+ Event: events.EventDriverReady,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Start: t,
+ Elapsed: t.Sub(t),
+ })
+
+ return true
+ })
+
+ var err error
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: "jobs"})
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ // start listening
+ go func() {
+ for i := uint8(0); i < p.cfg.NumPollers; i++ {
+ go func() {
+ for {
+ select {
+ case <-p.stopCh:
+ p.log.Info("------> job poller stopped <------")
+ return
+ default:
+ // get prioritized JOB from the queue
+ jb := p.queue.ExtractMin()
+
+ // parse the context
+ // for each job, context contains:
+ /*
+ 1. Job class
+ 2. Job ID provided from the outside
+ 3. Job Headers map[string][]string
+ 4. Timeout in seconds
+ 5. Pipeline name
+ */
+
+ ctx, err := jb.Context()
+ if err != nil {
+ errNack := jb.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed", "error", errNack)
+ }
+ p.log.Error("job marshal context", "error", err)
+ continue
+ }
+
+ // get payload from the sync.Pool
+ exec := p.getPayload(jb.Body(), ctx)
+
+ // protect from the pool reset
+ p.RLock()
+ resp, err := p.workersPool.Exec(exec)
+ p.RUnlock()
+ if err != nil {
+ // RR protocol level error, Nack the job
+ errNack := jb.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed", "error", errNack)
+ }
+
+ p.log.Error("job execute failed", "error", err)
+
+ p.putPayload(exec)
+ continue
+ }
+
+ // if response is nil or body is nil, just acknowledge the job
+ if resp == nil || resp.Body == nil {
+ p.putPayload(exec)
+ err = jb.Ack()
+ if err != nil {
+ p.log.Error("acknowledge error, job might be missed", "error", err)
+ continue
+ }
+ }
+
+ // handle the response protocol
+ err = handleResponse(resp.Body, jb, p.log)
+ if err != nil {
+ p.putPayload(exec)
+ errNack := jb.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed, job might be lost", "root error", err, "error nack", errNack)
+ continue
+ }
+
+ p.log.Error("job negatively acknowledged", "error", err)
+ continue
+ }
+
+ // return payload
+ p.putPayload(exec)
+ }
+ }
+ }()
+ }
+ }()
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ for k, v := range p.consumers {
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ err := v.Stop(ctx)
+ if err != nil {
+ cancel()
+ p.log.Error("stop job driver", "driver", k)
+ continue
+ }
+ cancel()
+ }
+
+ // this function can block forever, but we don't care, because we might have a chance to exit from the pollers,
+ // but if not, this is not a problem at all.
+ // The main target is to stop the drivers
+ go func() {
+ for i := uint8(0); i < p.cfg.NumPollers; i++ {
+ // stop jobs plugin pollers
+ p.stopCh <- struct{}{}
+ }
+ }()
+
+ // just wait pollers for 5 seconds before exit
+ time.Sleep(time.Second * 5)
+
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectMQBrokers,
+ }
+}
+
+func (p *Plugin) CollectMQBrokers(name endure.Named, c jobs.Constructor) {
+ p.jobConstructors[name.Name()] = c
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Reset() error {
+ p.Lock()
+ defer p.Unlock()
+
+ const op = errors.Op("jobs_plugin_reset")
+ p.log.Info("JOBS plugin received restart request. Restarting...")
+ p.workersPool.Destroy(context.Background())
+ p.workersPool = nil
+
+ var err error
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.log.Info("JOBS workers pool successfully restarted")
+
+ return nil
+}
+
+func (p *Plugin) Push(j *job.Job) error {
+ const op = errors.Op("jobs_plugin_push")
+
+ // get the pipeline for the job
+ pipe, ok := p.pipelines.Load(j.Options.Pipeline)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j.Options.Pipeline))
+ }
+
+ // type conversion
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
+ }
+
+ // if job has no priority, inherit it from the pipeline
+ // TODO(rustatian) merge all options, not only priority
+ if j.Options.Priority == 0 {
+ j.Options.Priority = ppl.Priority()
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
+
+ err := d.Push(ctx, j)
+ if err != nil {
+ cancel()
+ return errors.E(op, err)
+ }
+
+ cancel()
+
+ return nil
+}
+
+func (p *Plugin) PushBatch(j []*job.Job) error {
+ const op = errors.Op("jobs_plugin_push")
+
+ for i := 0; i < len(j); i++ {
+ // get the pipeline for the job
+ pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline, requested: %s", j[i].Options.Pipeline))
+ }
+
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
+ }
+
+ // if job has no priority, inherit it from the pipeline
+ if j[i].Options.Priority == 0 {
+ j[i].Options.Priority = ppl.Priority()
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ err := d.Push(ctx, j[i])
+ if err != nil {
+ cancel()
+ return errors.E(op, err)
+ }
+
+ cancel()
+ }
+
+ return nil
+}
+
+func (p *Plugin) Pause(pp string) {
+ pipe, ok := p.pipelines.Load(pp)
+
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pp)
+ }
+
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pp)
+ return
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
+ // redirect call to the underlying driver
+ d.Pause(ctx, ppl.Name())
+}
+
+func (p *Plugin) Resume(pp string) {
+ pipe, ok := p.pipelines.Load(pp)
+ if !ok {
+ p.log.Error("no such pipeline", "requested", pp)
+ }
+
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ p.log.Warn("driver for the pipeline not found", "pipeline", pp)
+ return
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
+ // redirect call to the underlying driver
+ d.Resume(ctx, ppl.Name())
+}
+
+// Declare a pipeline.
+func (p *Plugin) Declare(pipeline *pipeline.Pipeline) error {
+ const op = errors.Op("jobs_plugin_declare")
+ // driver for the pipeline (ie amqp, ephemeral, etc)
+ dr := pipeline.Driver()
+ if dr == "" {
+ return errors.E(op, errors.Errorf("no associated driver with the pipeline, pipeline name: %s", pipeline.Name()))
+ }
+
+ // jobConstructors contains constructors for the drivers
+ // we need here to initialize these drivers for the pipelines
+ if c, ok := p.jobConstructors[dr]; ok {
+ // init the driver from pipeline
+ initializedDriver, err := c.FromPipeline(pipeline, p.events, p.queue)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // add driver to the set of the consumers (name - pipeline name, value - associated driver)
+ p.consumers[pipeline.Name()] = initializedDriver
+
+ // register pipeline for the initialized driver
+ err = initializedDriver.Register(context.Background(), pipeline)
+ if err != nil {
+ return errors.E(op, errors.Errorf("pipe register failed for the driver: %s with pipe name: %s", pipeline.Driver(), pipeline.Name()))
+ }
+
+ // if pipeline initialized to be consumed, call Run on it
+ // but likely for the dynamic pipelines it should be started manually
+ if _, ok := p.consume[pipeline.Name()]; ok {
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
+ err = initializedDriver.Run(ctx, pipeline)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ }
+
+ // save the pipeline
+ p.pipelines.Store(pipeline.Name(), pipeline)
+
+ return nil
+}
+
+// Destroy pipeline and release all associated resources.
+func (p *Plugin) Destroy(pp string) error {
+ const op = errors.Op("jobs_plugin_destroy")
+ pipe, ok := p.pipelines.Load(pp)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline, requested: %s", pp))
+ }
+
+ // type conversion
+ ppl := pipe.(*pipeline.Pipeline)
+
+ d, ok := p.consumers[ppl.Name()]
+ if !ok {
+ return errors.E(op, errors.Errorf("consumer not registered for the requested driver: %s", ppl.Driver()))
+ }
+
+ // delete consumer
+ delete(p.consumers, ppl.Name())
+ p.pipelines.Delete(pp)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(p.cfg.Timeout))
+ defer cancel()
+
+ return d.Stop(ctx)
+}
+
+func (p *Plugin) List() []string {
+ out := make([]string, 0, 10)
+
+ p.pipelines.Range(func(key, _ interface{}) bool {
+ // we can safely convert value here as we know that we store keys as strings
+ out = append(out, key.(string))
+ return true
+ })
+
+ return out
+}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ log: p.log,
+ p: p,
+ }
+}
+
+func (p *Plugin) collectJobsEvents(event interface{}) {
+ if jev, ok := event.(events.JobEvent); ok {
+ switch jev.Event {
+ case events.EventPipePaused:
+ p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventJobStart:
+ p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventJobOK:
+ p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPushOK:
+ p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPushError:
+ p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventJobError:
+ p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPipeActive:
+ p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPipeStopped:
+ p.log.Warn("pipeline stopped", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventPipeError:
+ p.log.Error("pipeline error", "pipeline", jev.Pipeline, "error", jev.Error, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventDriverReady:
+ p.log.Info("driver ready", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ case events.EventInitialized:
+ p.log.Info("driver initialized", "driver", jev.Driver, "start", jev.Start.UTC())
+ }
+ }
+}
+
+func (p *Plugin) getPayload(body, context []byte) *payload.Payload {
+ pld := p.pldPool.Get().(*payload.Payload)
+ pld.Body = body
+ pld.Context = context
+ return pld
+}
+
+func (p *Plugin) putPayload(pld *payload.Payload) {
+ pld.Body = nil
+ pld.Context = nil
+ p.pldPool.Put(pld)
+}
diff --git a/plugins/jobs/protocol.go b/plugins/jobs/protocol.go
new file mode 100644
index 00000000..9d769fdf
--- /dev/null
+++ b/plugins/jobs/protocol.go
@@ -0,0 +1,78 @@
+package jobs
+
+import (
+ json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
+ pq "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type Type uint32
+
+const (
+ NoError Type = iota
+ Error
+)
+
+// internal worker protocol (jobs mode)
+type protocol struct {
+ // message type, see Type
+ T Type `json:"type"`
+ // Payload
+ Data json.RawMessage `json:"data"`
+}
+
+type errorResp struct {
+ Msg string `json:"message"`
+ Requeue bool `json:"requeue"`
+ Delay int64 `json:"delay_seconds"`
+ Headers map[string][]string `json:"headers"`
+}
+
+func handleResponse(resp []byte, jb pq.Item, log logger.Logger) error {
+ const op = errors.Op("jobs_handle_response")
+ // TODO(rustatian) to sync.Pool
+ p := &protocol{}
+
+ err := json.Unmarshal(resp, p)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ switch p.T {
+ // likely case
+ case NoError:
+ err = jb.Ack()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ case Error:
+ // TODO(rustatian) to sync.Pool
+ er := &errorResp{}
+
+ err = json.Unmarshal(p.Data, er)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ log.Error("jobs protocol error", "error", er.Msg, "delay", er.Delay, "requeue", er.Requeue)
+
+ if er.Requeue {
+ err = jb.Requeue(er.Headers, er.Delay)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ }
+
+ return errors.E(op, errors.Errorf("jobs response error: %v", er.Msg))
+
+ default:
+ err = jb.Ack()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+}
diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md
new file mode 100644
index 00000000..c89877e3
--- /dev/null
+++ b/plugins/jobs/response_protocol.md
@@ -0,0 +1,54 @@
+Response protocol used to communicate between worker and RR. When a worker completes its job, it should send a typed
+response. The response should contain:
+
+1. `type` field with the message type. Can be treated as enums.
+2. `data` field with the dynamic response related to the type.
+
+Types are:
+
+```
+0 - NO_ERROR
+1 - ERROR
+2 - ...
+```
+
+- `NO_ERROR`: contains only `type` and empty `data`.
+- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the
+ job,
+ `dalay_seconds`: to delay a queue for a provided amount of seconds, `headers` - job's headers represented as hashmap
+ with string key and array of strings as a value.
+
+For example:
+
+`NO_ERROR`:
+For example:
+
+```json
+{
+ "type": 0,
+ "data": {}
+}
+
+```
+
+`ERROR`:
+
+```json
+{
+ "type": 1,
+ "data": {
+ "message": "internal worker error",
+ "requeue": true,
+ "headers": [
+ {
+ "test": [
+ "1",
+ "2",
+ "3"
+ ]
+ }
+ ],
+ "delay_seconds": 10
+ }
+}
+```
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
new file mode 100644
index 00000000..7f9859fb
--- /dev/null
+++ b/plugins/jobs/rpc.go
@@ -0,0 +1,136 @@
+package jobs
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
+)
+
+type rpc struct {
+ log logger.Logger
+ p *Plugin
+}
+
+func (r *rpc) Push(j *jobsv1beta.PushRequest, _ *jobsv1beta.Empty) error {
+ const op = errors.Op("jobs_rpc_push")
+
+ // convert transport entity into domain
+ // how we can do this quickly
+
+ if j.GetJob().GetId() == "" {
+ return errors.E(op, errors.Str("empty ID field not allowed"))
+ }
+
+ err := r.p.Push(r.from(j.GetJob()))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (r *rpc) PushBatch(j *jobsv1beta.PushBatchRequest, _ *jobsv1beta.Empty) error {
+ const op = errors.Op("jobs_rpc_push")
+
+ l := len(j.GetJobs())
+
+ batch := make([]*job.Job, l)
+
+ for i := 0; i < l; i++ {
+ // convert transport entity into domain
+ // how we can do this quickly
+ batch[i] = r.from(j.GetJobs()[i])
+ }
+
+ err := r.p.PushBatch(batch)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (r *rpc) Pause(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error {
+ for i := 0; i < len(req.GetPipelines()); i++ {
+ r.p.Pause(req.GetPipelines()[i])
+ }
+
+ return nil
+}
+
+func (r *rpc) Resume(req *jobsv1beta.Pipelines, _ *jobsv1beta.Empty) error {
+ for i := 0; i < len(req.GetPipelines()); i++ {
+ r.p.Resume(req.GetPipelines()[i])
+ }
+
+ return nil
+}
+
+func (r *rpc) List(_ *jobsv1beta.Empty, resp *jobsv1beta.Pipelines) error {
+ resp.Pipelines = r.p.List()
+ return nil
+}
+
+// Declare pipeline used to dynamically declare any type of the pipeline
+// Mandatory fields:
+// 1. Driver
+// 2. Pipeline name
+// 3. Options related to the particular pipeline
+func (r *rpc) Declare(req *jobsv1beta.DeclareRequest, _ *jobsv1beta.Empty) error {
+ const op = errors.Op("rcp_declare_pipeline")
+ pipe := &pipeline.Pipeline{}
+
+ for i := range req.GetPipeline() {
+ (*pipe)[i] = req.GetPipeline()[i]
+ }
+
+ err := r.p.Declare(pipe)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (r *rpc) Destroy(req *jobsv1beta.Pipelines, resp *jobsv1beta.Pipelines) error {
+ const op = errors.Op("rcp_declare_pipeline")
+
+ var destroyed []string //nolint:prealloc
+ for i := 0; i < len(req.GetPipelines()); i++ {
+ err := r.p.Destroy(req.GetPipelines()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
+ destroyed = append(destroyed, req.GetPipelines()[i])
+ }
+
+ // return destroyed pipelines
+ resp.Pipelines = destroyed
+
+ return nil
+}
+
+// from converts from transport entity to domain
+func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
+ headers := map[string][]string{}
+
+ for k, v := range j.GetHeaders() {
+ headers[k] = v.GetValue()
+ }
+
+ jb := &job.Job{
+ Job: j.GetJob(),
+ Headers: headers,
+ Ident: j.GetId(),
+ Payload: j.GetPayload(),
+ Options: &job.Options{
+ Priority: j.GetOptions().GetPriority(),
+ Pipeline: j.GetOptions().GetPipeline(),
+ Delay: j.GetOptions().GetDelay(),
+ },
+ }
+
+ return jb
+}