summaryrefslogtreecommitdiff
path: root/plugins/amqp
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-31 15:31:30 +0300
committerGitHub <[email protected]>2021-08-31 15:31:30 +0300
commit83e7bc6afbc2e523a95cf9dcb8b25cf5f7ba3f1e (patch)
tree884dd2991acf12826752632b8321410e7cc923ce /plugins/amqp
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
parent31cf040029eb0b26278e4a9948cbc1aba77ed58b (diff)
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
Diffstat (limited to 'plugins/amqp')
-rw-r--r--plugins/amqp/amqpjobs/config.go67
-rw-r--r--plugins/amqp/amqpjobs/consumer.go516
-rw-r--r--plugins/amqp/amqpjobs/item.go239
-rw-r--r--plugins/amqp/amqpjobs/listener.go25
-rw-r--r--plugins/amqp/amqpjobs/rabbit_init.go57
-rw-r--r--plugins/amqp/amqpjobs/redial.go141
-rw-r--r--plugins/amqp/plugin.go41
7 files changed, 1086 insertions, 0 deletions
diff --git a/plugins/amqp/amqpjobs/config.go b/plugins/amqp/amqpjobs/config.go
new file mode 100644
index 00000000..ac2f6e53
--- /dev/null
+++ b/plugins/amqp/amqpjobs/config.go
@@ -0,0 +1,67 @@
+package amqpjobs
+
+// pipeline rabbitmq info
+const (
+ exchangeKey string = "exchange"
+ exchangeType string = "exchange_type"
+ queue string = "queue"
+ routingKey string = "routing_key"
+ prefetch string = "prefetch"
+ exclusive string = "exclusive"
+ priority string = "priority"
+ multipleAsk string = "multiple_ask"
+ requeueOnFail string = "requeue_on_fail"
+
+ dlx string = "x-dead-letter-exchange"
+ dlxRoutingKey string = "x-dead-letter-routing-key"
+ dlxTTL string = "x-message-ttl"
+ dlxExpires string = "x-expires"
+
+ contentType string = "application/octet-stream"
+)
+
+type GlobalCfg struct {
+ Addr string `mapstructure:"addr"`
+}
+
+// Config is used to parse pipeline configuration
+type Config struct {
+ Prefetch int `mapstructure:"prefetch"`
+ Queue string `mapstructure:"queue"`
+ Priority int64 `mapstructure:"priority"`
+ Exchange string `mapstructure:"exchange"`
+ ExchangeType string `mapstructure:"exchange_type"`
+ RoutingKey string `mapstructure:"routing_key"`
+ Exclusive bool `mapstructure:"exclusive"`
+ MultipleAck bool `mapstructure:"multiple_ask"`
+ RequeueOnFail bool `mapstructure:"requeue_on_fail"`
+}
+
+func (c *Config) InitDefault() {
+ // all options should be in sync with the pipeline defaults in the FromPipeline method
+ if c.ExchangeType == "" {
+ c.ExchangeType = "direct"
+ }
+
+ if c.Exchange == "" {
+ c.Exchange = "amqp.default"
+ }
+
+ if c.Queue == "" {
+ c.Queue = "default"
+ }
+
+ if c.Prefetch == 0 {
+ c.Prefetch = 10
+ }
+
+ if c.Priority == 0 {
+ c.Priority = 10
+ }
+}
+
+func (c *GlobalCfg) InitDefault() {
+ if c.Addr == "" {
+ c.Addr = "amqp://guest:[email protected]:5672/"
+ }
+}
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
new file mode 100644
index 00000000..784a102c
--- /dev/null
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -0,0 +1,516 @@
+package amqpjobs
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/google/uuid"
+ amqp "github.com/rabbitmq/amqp091-go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+const (
+ pluginName string = "amqp"
+)
+
+type consumer struct {
+ sync.Mutex
+ log logger.Logger
+ pq priorityqueue.Queue
+ eh events.Handler
+
+ pipeline atomic.Value
+
+ // amqp connection
+ conn *amqp.Connection
+ consumeChan *amqp.Channel
+ publishChan chan *amqp.Channel
+ consumeID string
+ connStr string
+
+ retryTimeout time.Duration
+ //
+ // prefetch QoS AMQP
+ //
+ prefetch int
+ //
+ // pipeline's priority
+ //
+ priority int64
+ exchangeName string
+ queue string
+ exclusive bool
+ exchangeType string
+ routingKey string
+ multipleAck bool
+ requeueOnFail bool
+
+ listeners uint32
+ delayed *int64
+ stopCh chan struct{}
+}
+
+// NewAMQPConsumer initializes rabbitmq pipeline
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("new_amqp_consumer")
+ // we need to obtain two parts of the amqp information here.
+ // firs part - address to connect, it is located in the global section under the amqp pluginName
+ // second part - queues and other pipeline information
+ // if no such key - error
+ if !cfg.Has(configKey) {
+ return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
+ }
+
+ // PARSE CONFIGURATION START -------
+ var pipeCfg Config
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(configKey, &pipeCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipeCfg.InitDefault()
+
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+ // PARSE CONFIGURATION END -------
+
+ jb := &consumer{
+ log: log,
+ pq: pq,
+ eh: e,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ // TODO to config
+ retryTimeout: time.Minute * 5,
+ priority: pipeCfg.Priority,
+ delayed: utils.Int64(0),
+
+ publishChan: make(chan *amqp.Channel, 1),
+ routingKey: pipeCfg.RoutingKey,
+ queue: pipeCfg.Queue,
+ exchangeType: pipeCfg.ExchangeType,
+ exchangeName: pipeCfg.Exchange,
+ prefetch: pipeCfg.Prefetch,
+ exclusive: pipeCfg.Exclusive,
+ multipleAck: pipeCfg.MultipleAck,
+ requeueOnFail: pipeCfg.RequeueOnFail,
+ }
+
+ jb.conn, err = amqp.Dial(globalCfg.Addr)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save address
+ jb.connStr = globalCfg.Addr
+
+ err = jb.initRabbitMQ()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pch, err := jb.conn.Channel()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ jb.publishChan <- pch
+
+ // run redialer and requeue listener for the connection
+ jb.redialer()
+
+ return jb, nil
+}
+
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ const op = errors.Op("new_amqp_consumer_from_pipeline")
+ // we need to obtain two parts of the amqp information here.
+ // firs part - address to connect, it is located in the global section under the amqp pluginName
+ // second part - queues and other pipeline information
+
+ // only global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ // PARSE CONFIGURATION -------
+
+ jb := &consumer{
+ log: log,
+ eh: e,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ retryTimeout: time.Minute * 5,
+ delayed: utils.Int64(0),
+
+ publishChan: make(chan *amqp.Channel, 1),
+ routingKey: pipeline.String(routingKey, ""),
+ queue: pipeline.String(queue, "default"),
+ exchangeType: pipeline.String(exchangeType, "direct"),
+ exchangeName: pipeline.String(exchangeKey, "amqp.default"),
+ prefetch: pipeline.Int(prefetch, 10),
+ priority: int64(pipeline.Int(priority, 10)),
+ exclusive: pipeline.Bool(exclusive, false),
+ multipleAck: pipeline.Bool(multipleAsk, false),
+ requeueOnFail: pipeline.Bool(requeueOnFail, false),
+ }
+
+ jb.conn, err = amqp.Dial(globalCfg.Addr)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save address
+ jb.connStr = globalCfg.Addr
+
+ err = jb.initRabbitMQ()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pch, err := jb.conn.Channel()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ jb.publishChan <- pch
+
+ // register the pipeline
+ // error here is always nil
+ _ = jb.Register(context.Background(), pipeline)
+
+ // run redialer for the connection
+ jb.redialer()
+
+ return jb, nil
+}
+
+func (c *consumer) Push(ctx context.Context, job *job.Job) error {
+ const op = errors.Op("rabbitmq_push")
+ // check if the pipeline registered
+
+ // load atomic value
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != job.Options.Pipeline {
+ return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name()))
+ }
+
+ err := c.handleItem(ctx, fromJob(job))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ c.pipeline.Store(p)
+ return nil
+}
+
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_run")
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p.Name() {
+ return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
+ }
+
+ // protect connection (redial)
+ c.Lock()
+ defer c.Unlock()
+
+ var err error
+ c.consumeChan, err = c.conn.Channel()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = c.consumeChan.Qos(c.prefetch, 0, false)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // start reading messages from the channel
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // run listener
+ c.listener(deliv)
+
+ atomic.StoreUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
+ return nil
+}
+
+func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
+ const op = errors.Op("amqp_driver_state")
+ select {
+ case pch := <-c.publishChan:
+ defer func() {
+ c.publishChan <- pch
+ }()
+
+ q, err := pch.QueueInspect(c.queue)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
+ return &jobState.State{
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Queue: q.Name,
+ Active: int64(q.Messages),
+ Delayed: atomic.LoadInt64(c.delayed),
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
+ }, nil
+
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+
+func (c *consumer) Pause(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested pause on: ", p)
+ }
+
+ l := atomic.LoadUint32(&c.listeners)
+ // no active listeners
+ if l == 0 {
+ c.log.Warn("no active listeners, nothing to pause")
+ return
+ }
+
+ atomic.AddUint32(&c.listeners, ^uint32(0))
+
+ // protect connection (redial)
+ c.Lock()
+ defer c.Unlock()
+
+ err := c.consumeChan.Cancel(c.consumeID, true)
+ if err != nil {
+ c.log.Error("cancel publish channel, forcing close", "error", err)
+ errCl := c.consumeChan.Close()
+ if errCl != nil {
+ c.log.Error("force close failed", "error", err)
+ return
+ }
+ return
+ }
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipePaused,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func (c *consumer) Resume(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ if pipe.Name() != p {
+ c.log.Error("no such pipeline", "requested resume on: ", p)
+ }
+
+ // protect connection (redial)
+ c.Lock()
+ defer c.Unlock()
+
+ l := atomic.LoadUint32(&c.listeners)
+ // no active listeners
+ if l == 1 {
+ c.log.Warn("amqp listener already in the active state")
+ return
+ }
+
+ var err error
+ c.consumeChan, err = c.conn.Channel()
+ if err != nil {
+ c.log.Error("create channel on rabbitmq connection", "error", err)
+ return
+ }
+
+ err = c.consumeChan.Qos(c.prefetch, 0, false)
+ if err != nil {
+ c.log.Error("qos set failed", "error", err)
+ return
+ }
+
+ // start reading messages from the channel
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ c.log.Error("consume operation failed", "error", err)
+ return
+ }
+
+ // run listener
+ c.listener(deliv)
+
+ // increase number of listeners
+ atomic.AddUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+}
+
+func (c *consumer) Stop(context.Context) error {
+ if atomic.LoadUint32(&c.listeners) > 0 {
+ c.stopCh <- struct{}{}
+ }
+
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+ return nil
+}
+
+// handleItem
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("rabbitmq_handle_item")
+ select {
+ case pch := <-c.publishChan:
+ // return the channel back
+ defer func() {
+ c.publishChan <- pch
+ }()
+
+ // convert
+ table, err := pack(msg.ID(), msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ const op = errors.Op("rabbitmq_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ atomic.AddInt64(c.delayed, 1)
+ // TODO declare separate method for this if condition
+ // TODO dlx cache channel??
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, c.exchangeName, c.queue)
+ _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: c.exchangeName,
+ dlxRoutingKey: c.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ atomic.AddInt64(c.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ err = pch.QueueBind(tmpQ, tmpQ, c.exchangeName, false, nil)
+ if err != nil {
+ atomic.AddInt64(c.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(c.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ atomic.AddInt64(c.delayed, ^int64(0))
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ // insert to the local, limited pipeline
+ err = pch.Publish(c.exchangeName, c.routingKey, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+
+func ready(r uint32) bool {
+ return r > 0
+}
diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go
new file mode 100644
index 00000000..66b70a36
--- /dev/null
+++ b/plugins/amqp/amqpjobs/item.go
@@ -0,0 +1,239 @@
+package amqpjobs
+
+import (
+ "context"
+ "fmt"
+ "sync/atomic"
+ "time"
+
+ json "github.com/json-iterator/go"
+ amqp "github.com/rabbitmq/amqp091-go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/job"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type Item struct {
+ // Job contains pluginName of job broker (usually PHP class).
+ Job string `json:"job"`
+
+ // Ident is unique identifier of the job, should be provided from outside
+ Ident string `json:"id"`
+
+ // Payload is string data (usually JSON) passed to Job broker.
+ Payload string `json:"payload"`
+
+ // Headers with key-values pairs
+ Headers map[string][]string `json:"headers"`
+
+ // Options contains set of PipelineOptions specific to job execution. Can be empty.
+ Options *Options `json:"options,omitempty"`
+}
+
+// Options carry information about how to handle given job.
+type Options struct {
+ // Priority is job priority, default - 10
+ // pointer to distinguish 0 as a priority and nil as priority not set
+ Priority int64 `json:"priority"`
+
+ // Pipeline manually specified pipeline.
+ Pipeline string `json:"pipeline,omitempty"`
+
+ // Delay defines time duration to delay execution for. Defaults to none.
+ Delay int64 `json:"delay,omitempty"`
+
+ // private
+ // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
+ ack func(multiply bool) error
+
+ // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
+ // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
+ // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
+ // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
+ nack func(multiply bool, requeue bool) error
+
+ // requeueFn used as a pointer to the push function
+ requeueFn func(context.Context, *Item) error
+ delayed *int64
+ multipleAsk bool
+ requeue bool
+}
+
+// DelayDuration returns delay duration in a form of time.Duration.
+func (o *Options) DelayDuration() time.Duration {
+ return time.Second * time.Duration(o.Delay)
+}
+
+func (i *Item) ID() string {
+ return i.Ident
+}
+
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
+}
+
+// Body packs job payload into binary payload.
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
+}
+
+// Context packs job context (job, id) into binary payload.
+// Not used in the amqp, amqp.Table used instead
+func (i *Item) Context() ([]byte, error) {
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (i *Item) Ack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
+ return i.Options.ack(i.Options.multipleAsk)
+}
+
+func (i *Item) Nack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
+ return i.Options.nack(false, i.Options.requeue)
+}
+
+// Requeue with the provided delay, handled by the Nack
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
+ // overwrite the delay
+ i.Options.Delay = delay
+ i.Headers = headers
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ errNack := i.Options.nack(false, true)
+ if errNack != nil {
+ return fmt.Errorf("requeue error: %v\nack error: %v", err, errNack)
+ }
+
+ return err
+ }
+
+ // ack the job
+ err = i.Options.ack(false)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
+func (c *consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
+ const op = errors.Op("from_delivery_convert")
+ item, err := c.unpack(d)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ i := &Item{
+ Job: item.Job,
+ Ident: item.Ident,
+ Payload: item.Payload,
+ Headers: item.Headers,
+ Options: item.Options,
+ }
+
+ item.Options.ack = d.Ack
+ item.Options.nack = d.Nack
+ item.Options.delayed = c.delayed
+
+ // requeue func
+ item.Options.requeueFn = c.handleItem
+ return i, nil
+}
+
+func fromJob(job *job.Job) *Item {
+ return &Item{
+ Job: job.Job,
+ Ident: job.Ident,
+ Payload: job.Payload,
+ Headers: job.Headers,
+ Options: &Options{
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ },
+ }
+}
+
+// pack job metadata into headers
+func pack(id string, j *Item) (amqp.Table, error) {
+ headers, err := json.Marshal(j.Headers)
+ if err != nil {
+ return nil, err
+ }
+ return amqp.Table{
+ job.RRID: id,
+ job.RRJob: j.Job,
+ job.RRPipeline: j.Options.Pipeline,
+ job.RRHeaders: headers,
+ job.RRDelay: j.Options.Delay,
+ job.RRPriority: j.Options.Priority,
+ }, nil
+}
+
+// unpack restores jobs.Options
+func (c *consumer) unpack(d amqp.Delivery) (*Item, error) {
+ item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
+ multipleAsk: c.multipleAck,
+ requeue: c.requeueOnFail,
+ requeueFn: c.handleItem,
+ }}
+
+ if _, ok := d.Headers[job.RRID].(string); !ok {
+ return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID))
+ }
+
+ item.Ident = d.Headers[job.RRID].(string)
+
+ if _, ok := d.Headers[job.RRJob].(string); !ok {
+ return nil, errors.E(errors.Errorf("missing header `%s`", job.RRJob))
+ }
+
+ item.Job = d.Headers[job.RRJob].(string)
+
+ if _, ok := d.Headers[job.RRPipeline].(string); ok {
+ item.Options.Pipeline = d.Headers[job.RRPipeline].(string)
+ }
+
+ if h, ok := d.Headers[job.RRHeaders].([]byte); ok {
+ err := json.Unmarshal(h, &item.Headers)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ if _, ok := d.Headers[job.RRDelay].(int64); ok {
+ item.Options.Delay = d.Headers[job.RRDelay].(int64)
+ }
+
+ if _, ok := d.Headers[job.RRPriority]; !ok {
+ // set pipe's priority
+ item.Options.Priority = c.priority
+ } else {
+ item.Options.Priority = d.Headers[job.RRPriority].(int64)
+ }
+
+ return item, nil
+}
diff --git a/plugins/amqp/amqpjobs/listener.go b/plugins/amqp/amqpjobs/listener.go
new file mode 100644
index 00000000..75c61cad
--- /dev/null
+++ b/plugins/amqp/amqpjobs/listener.go
@@ -0,0 +1,25 @@
+package amqpjobs
+
+import amqp "github.com/rabbitmq/amqp091-go"
+
+func (c *consumer) listener(deliv <-chan amqp.Delivery) {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case msg, ok := <-deliv:
+ if !ok {
+ c.log.Info("delivery channel closed, leaving the rabbit listener")
+ return
+ }
+
+ d, err := c.fromDelivery(msg)
+ if err != nil {
+ c.log.Error("amqp delivery convert", "error", err)
+ continue
+ }
+ // insert job into the main priority queue
+ c.pq.Insert(d)
+ }
+ }
+ }()
+}
diff --git a/plugins/amqp/amqpjobs/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go
new file mode 100644
index 00000000..fb5f6911
--- /dev/null
+++ b/plugins/amqp/amqpjobs/rabbit_init.go
@@ -0,0 +1,57 @@
+package amqpjobs
+
+import (
+ "github.com/spiral/errors"
+)
+
+func (c *consumer) initRabbitMQ() error {
+ const op = errors.Op("jobs_plugin_rmq_init")
+ // Channel opens a unique, concurrent server channel to process the bulk of AMQP
+ // messages. Any error from methods on this receiver will render the receiver
+ // invalid and a new Channel should be opened.
+ channel, err := c.conn.Channel()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // declare an exchange (idempotent operation)
+ err = channel.ExchangeDeclare(
+ c.exchangeName,
+ c.exchangeType,
+ true,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // verify or declare a queue
+ q, err := channel.QueueDeclare(
+ c.queue,
+ false,
+ false,
+ c.exclusive,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // bind queue to the exchange
+ err = channel.QueueBind(
+ q.Name,
+ c.routingKey,
+ c.exchangeName,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return channel.Close()
+}
diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go
new file mode 100644
index 00000000..8d21784f
--- /dev/null
+++ b/plugins/amqp/amqpjobs/redial.go
@@ -0,0 +1,141 @@
+package amqpjobs
+
+import (
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ amqp "github.com/rabbitmq/amqp091-go"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+)
+
+// redialer used to redial to the rabbitmq in case of the connection interrupts
+func (c *consumer) redialer() { //nolint:gocognit
+ go func() {
+ const op = errors.Op("rabbitmq_redial")
+
+ for {
+ select {
+ case err := <-c.conn.NotifyClose(make(chan *amqp.Error)):
+ if err == nil {
+ return
+ }
+
+ c.Lock()
+
+ // trash the broken publishing channel
+ <-c.publishChan
+
+ t := time.Now().UTC()
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeError,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Error: err,
+ Start: time.Now().UTC(),
+ })
+
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = c.retryTimeout
+ operation := func() error {
+ c.log.Warn("rabbitmq reconnecting, caused by", "error", err)
+ var dialErr error
+ c.conn, dialErr = amqp.Dial(c.connStr)
+ if dialErr != nil {
+ return errors.E(op, dialErr)
+ }
+
+ c.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
+
+ // re-init connection
+ errInit := c.initRabbitMQ()
+ if errInit != nil {
+ c.log.Error("rabbitmq dial", "error", errInit)
+ return errInit
+ }
+
+ // redeclare consume channel
+ var errConnCh error
+ c.consumeChan, errConnCh = c.conn.Channel()
+ if errConnCh != nil {
+ return errors.E(op, errConnCh)
+ }
+
+ // redeclare publish channel
+ pch, errPubCh := c.conn.Channel()
+ if errPubCh != nil {
+ return errors.E(op, errPubCh)
+ }
+
+ // start reading messages from the channel
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // put the fresh publishing channel
+ c.publishChan <- pch
+ // restart listener
+ c.listener(deliv)
+
+ c.log.Info("queues and subscribers redeclared successfully")
+
+ return nil
+ }
+
+ retryErr := backoff.Retry(operation, expb)
+ if retryErr != nil {
+ c.Unlock()
+ c.log.Error("backoff failed", "error", retryErr)
+ return
+ }
+
+ c.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Start: t,
+ Elapsed: time.Since(t),
+ })
+
+ c.Unlock()
+
+ case <-c.stopCh:
+ if c.publishChan != nil {
+ pch := <-c.publishChan
+ err := pch.Close()
+ if err != nil {
+ c.log.Error("publish channel close", "error", err)
+ }
+ }
+
+ if c.consumeChan != nil {
+ err := c.consumeChan.Close()
+ if err != nil {
+ c.log.Error("consume channel close", "error", err)
+ }
+ }
+ if c.conn != nil {
+ err := c.conn.Close()
+ if err != nil {
+ c.log.Error("amqp connection close", "error", err)
+ }
+ }
+
+ return
+ }
+ }
+ }()
+}
diff --git a/plugins/amqp/plugin.go b/plugins/amqp/plugin.go
new file mode 100644
index 00000000..c4f5f1da
--- /dev/null
+++ b/plugins/amqp/plugin.go
@@ -0,0 +1,41 @@
+package amqp
+
+import (
+ "github.com/spiral/roadrunner/v2/common/jobs"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/amqp/amqpjobs"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ pluginName string = "amqp"
+)
+
+type Plugin struct {
+ log logger.Logger
+ cfg config.Configurer
+}
+
+func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ p.log = log
+ p.cfg = cfg
+ return nil
+}
+
+func (p *Plugin) Name() string {
+ return pluginName
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return amqpjobs.NewAMQPConsumer(configKey, p.log, p.cfg, e, pq)
+}
+
+// FromPipeline constructs AMQP driver from pipeline
+func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) {
+ return amqpjobs.FromPipeline(pipe, p.log, p.cfg, e, pq)
+}