summaryrefslogtreecommitdiff
path: root/plugins/amqp/amqpjobs/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/amqp/amqpjobs/consumer.go')
-rw-r--r--plugins/amqp/amqpjobs/consumer.go524
1 files changed, 0 insertions, 524 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
deleted file mode 100644
index 2ff0a40a..00000000
--- a/plugins/amqp/amqpjobs/consumer.go
+++ /dev/null
@@ -1,524 +0,0 @@
-package amqpjobs
-
-import (
- "context"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/google/uuid"
- amqp "github.com/rabbitmq/amqp091-go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
- jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/job"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-const (
- pluginName string = "amqp"
-)
-
-type consumer struct {
- sync.Mutex
- log logger.Logger
- pq priorityqueue.Queue
- eh events.Handler
-
- pipeline atomic.Value
-
- // amqp connection
- conn *amqp.Connection
- consumeChan *amqp.Channel
- publishChan chan *amqp.Channel
- consumeID string
- connStr string
-
- retryTimeout time.Duration
- //
- // prefetch QoS AMQP
- //
- prefetch int
- //
- // pipeline's priority
- //
- priority int64
- exchangeName string
- queue string
- exclusive bool
- exchangeType string
- routingKey string
- multipleAck bool
- requeueOnFail bool
-
- listeners uint32
- delayed *int64
- stopCh chan struct{}
-}
-
-// NewAMQPConsumer initializes rabbitmq pipeline
-func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
- const op = errors.Op("new_amqp_consumer")
- // we need to obtain two parts of the amqp information here.
- // firs part - address to connect, it is located in the global section under the amqp pluginName
- // second part - queues and other pipeline information
- // if no such key - error
- if !cfg.Has(configKey) {
- return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
- }
-
- // if no global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
- }
-
- // PARSE CONFIGURATION START -------
- var pipeCfg Config
- var globalCfg GlobalCfg
-
- err := cfg.UnmarshalKey(configKey, &pipeCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipeCfg.InitDefault()
-
- err = cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
- // PARSE CONFIGURATION END -------
-
- jb := &consumer{
- log: log,
- pq: pq,
- eh: e,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- // TODO to config
- retryTimeout: time.Minute * 5,
- priority: pipeCfg.Priority,
- delayed: utils.Int64(0),
-
- publishChan: make(chan *amqp.Channel, 1),
- routingKey: pipeCfg.RoutingKey,
- queue: pipeCfg.Queue,
- exchangeType: pipeCfg.ExchangeType,
- exchangeName: pipeCfg.Exchange,
- prefetch: pipeCfg.Prefetch,
- exclusive: pipeCfg.Exclusive,
- multipleAck: pipeCfg.MultipleAck,
- requeueOnFail: pipeCfg.RequeueOnFail,
- }
-
- jb.conn, err = amqp.Dial(globalCfg.Addr)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save address
- jb.connStr = globalCfg.Addr
-
- err = jb.initRabbitMQ()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pch, err := jb.conn.Channel()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- jb.publishChan <- pch
-
- // run redialer and requeue listener for the connection
- jb.redialer()
-
- return jb, nil
-}
-
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*consumer, error) {
- const op = errors.Op("new_amqp_consumer_from_pipeline")
- // we need to obtain two parts of the amqp information here.
- // firs part - address to connect, it is located in the global section under the amqp pluginName
- // second part - queues and other pipeline information
-
- // only global section
- if !cfg.Has(pluginName) {
- return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
- }
-
- // PARSE CONFIGURATION -------
- var globalCfg GlobalCfg
-
- err := cfg.UnmarshalKey(pluginName, &globalCfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- globalCfg.InitDefault()
-
- // PARSE CONFIGURATION -------
-
- jb := &consumer{
- log: log,
- eh: e,
- pq: pq,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- retryTimeout: time.Minute * 5,
- delayed: utils.Int64(0),
-
- publishChan: make(chan *amqp.Channel, 1),
- routingKey: pipeline.String(routingKey, ""),
- queue: pipeline.String(queue, "default"),
- exchangeType: pipeline.String(exchangeType, "direct"),
- exchangeName: pipeline.String(exchangeKey, "amqp.default"),
- prefetch: pipeline.Int(prefetch, 10),
- priority: int64(pipeline.Int(priority, 10)),
- exclusive: pipeline.Bool(exclusive, false),
- multipleAck: pipeline.Bool(multipleAsk, false),
- requeueOnFail: pipeline.Bool(requeueOnFail, false),
- }
-
- jb.conn, err = amqp.Dial(globalCfg.Addr)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save address
- jb.connStr = globalCfg.Addr
-
- err = jb.initRabbitMQ()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pch, err := jb.conn.Channel()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- jb.publishChan <- pch
-
- // register the pipeline
- // error here is always nil
- _ = jb.Register(context.Background(), pipeline)
-
- // run redialer for the connection
- jb.redialer()
-
- return jb, nil
-}
-
-func (c *consumer) Push(ctx context.Context, job *job.Job) error {
- const op = errors.Op("rabbitmq_push")
- // check if the pipeline registered
-
- // load atomic value
- pipe := c.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != job.Options.Pipeline {
- return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name()))
- }
-
- err := c.handleItem(ctx, fromJob(job))
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- c.pipeline.Store(p)
- return nil
-}
-
-func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- start := time.Now()
- const op = errors.Op("rabbit_run")
-
- pipe := c.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p.Name() {
- return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
- }
-
- // protect connection (redial)
- c.Lock()
- defer c.Unlock()
-
- var err error
- c.consumeChan, err = c.conn.Channel()
- if err != nil {
- return errors.E(op, err)
- }
-
- err = c.consumeChan.Qos(c.prefetch, 0, false)
- if err != nil {
- return errors.E(op, err)
- }
-
- // start reading messages from the channel
- deliv, err := c.consumeChan.Consume(
- c.queue,
- c.consumeID,
- false,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- return errors.E(op, err)
- }
-
- // run listener
- c.listener(deliv)
-
- atomic.StoreUint32(&c.listeners, 1)
-
- c.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: start,
- Elapsed: time.Since(start),
- })
-
- return nil
-}
-
-func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
- const op = errors.Op("amqp_driver_state")
- select {
- case pch := <-c.publishChan:
- defer func() {
- c.publishChan <- pch
- }()
-
- q, err := pch.QueueInspect(c.queue)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- pipe := c.pipeline.Load().(*pipeline.Pipeline)
-
- return &jobState.State{
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Queue: q.Name,
- Active: int64(q.Messages),
- Delayed: atomic.LoadInt64(c.delayed),
- Ready: ready(atomic.LoadUint32(&c.listeners)),
- }, nil
-
- case <-ctx.Done():
- return nil, errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
-
-func (c *consumer) Pause(_ context.Context, p string) {
- start := time.Now()
- pipe := c.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- c.log.Error("no such pipeline", "requested pause on: ", p)
- }
-
- l := atomic.LoadUint32(&c.listeners)
- // no active listeners
- if l == 0 {
- c.log.Warn("no active listeners, nothing to pause")
- return
- }
-
- atomic.AddUint32(&c.listeners, ^uint32(0))
-
- // protect connection (redial)
- c.Lock()
- defer c.Unlock()
-
- err := c.consumeChan.Cancel(c.consumeID, true)
- if err != nil {
- c.log.Error("cancel publish channel, forcing close", "error", err)
- errCl := c.consumeChan.Close()
- if errCl != nil {
- c.log.Error("force close failed", "error", err)
- return
- }
- return
- }
-
- c.eh.Push(events.JobEvent{
- Event: events.EventPipePaused,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: start,
- Elapsed: time.Since(start),
- })
-}
-
-func (c *consumer) Resume(_ context.Context, p string) {
- start := time.Now()
- pipe := c.pipeline.Load().(*pipeline.Pipeline)
- if pipe.Name() != p {
- c.log.Error("no such pipeline", "requested resume on: ", p)
- }
-
- // protect connection (redial)
- c.Lock()
- defer c.Unlock()
-
- l := atomic.LoadUint32(&c.listeners)
- // no active listeners
- if l == 1 {
- c.log.Warn("amqp listener already in the active state")
- return
- }
-
- var err error
- c.consumeChan, err = c.conn.Channel()
- if err != nil {
- c.log.Error("create channel on rabbitmq connection", "error", err)
- return
- }
-
- err = c.consumeChan.Qos(c.prefetch, 0, false)
- if err != nil {
- c.log.Error("qos set failed", "error", err)
- return
- }
-
- // start reading messages from the channel
- deliv, err := c.consumeChan.Consume(
- c.queue,
- c.consumeID,
- false,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- c.log.Error("consume operation failed", "error", err)
- return
- }
-
- // run listener
- c.listener(deliv)
-
- // increase number of listeners
- atomic.AddUint32(&c.listeners, 1)
-
- c.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: start,
- Elapsed: time.Since(start),
- })
-}
-
-func (c *consumer) Stop(context.Context) error {
- start := time.Now()
- c.stopCh <- struct{}{}
-
- pipe := c.pipeline.Load().(*pipeline.Pipeline)
-
- c.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Driver: pipe.Driver(),
- Pipeline: pipe.Name(),
- Start: start,
- Elapsed: time.Since(start),
- })
-
- return nil
-}
-
-// handleItem
-func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
- const op = errors.Op("rabbitmq_handle_item")
- select {
- case pch := <-c.publishChan:
- // return the channel back
- defer func() {
- c.publishChan <- pch
- }()
-
- // convert
- table, err := pack(msg.ID(), msg)
- if err != nil {
- return errors.E(op, err)
- }
-
- const op = errors.Op("rabbitmq_handle_item")
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- atomic.AddInt64(c.delayed, 1)
- // TODO declare separate method for this if condition
- // TODO dlx cache channel??
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, c.exchangeName, c.queue)
- _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: c.exchangeName,
- dlxRoutingKey: c.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
- if err != nil {
- atomic.AddInt64(c.delayed, ^int64(0))
- return errors.E(op, err)
- }
-
- err = pch.QueueBind(tmpQ, tmpQ, c.exchangeName, false, nil)
- if err != nil {
- atomic.AddInt64(c.delayed, ^int64(0))
- return errors.E(op, err)
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(c.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- atomic.AddInt64(c.delayed, ^int64(0))
- return errors.E(op, err)
- }
-
- return nil
- }
-
- // insert to the local, limited pipeline
- err = pch.Publish(c.exchangeName, c.routingKey, false, false, amqp.Publishing{
- Headers: table,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
- }
-}
-
-func ready(r uint32) bool {
- return r > 0
-}