diff options
Diffstat (limited to 'plugins/amqp')
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go | 146 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/item.go | 18 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/listener.go | 10 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/rabbit_init.go | 16 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/redial.go | 68 |
5 files changed, 130 insertions, 128 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 1931ceaa..f1b4d54f 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -218,17 +218,17 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return jb, nil } -func (j *consumer) Push(ctx context.Context, job *job.Job) error { +func (c *consumer) Push(ctx context.Context, job *job.Job) error { const op = errors.Op("rabbitmq_push") // check if the pipeline registered // load atomic value - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != job.Options.Pipeline { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) } - err := j.handleItem(ctx, fromJob(job)) + err := c.handleItem(ctx, fromJob(job)) if err != nil { return errors.E(op, err) } @@ -236,38 +236,38 @@ func (j *consumer) Push(ctx context.Context, job *job.Job) error { return nil } -func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { - j.pipeline.Store(p) +func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { + c.pipeline.Store(p) return nil } -func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { - const op = errors.Op("rabbit_consume") +func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + const op = errors.Op("rabbit_run") - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) } // protect connection (redial) - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() var err error - j.consumeChan, err = j.conn.Channel() + c.consumeChan, err = c.conn.Channel() if err != nil { return errors.E(op, err) } - err = j.consumeChan.Qos(j.prefetch, 0, false) + err = c.consumeChan.Qos(c.prefetch, 0, false) if err != nil { return errors.E(op, err) } // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, + deliv, err := c.consumeChan.Consume( + c.queue, + c.consumeID, false, false, false, @@ -279,9 +279,11 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { } // run listener - j.listener(deliv) + c.listener(deliv) - j.eh.Push(events.JobEvent{ + atomic.StoreUint32(&c.listeners, 1) + + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -291,28 +293,28 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { return nil } -func (j *consumer) State(ctx context.Context) (*jobState.State, error) { +func (c *consumer) State(ctx context.Context) (*jobState.State, error) { const op = errors.Op("amqp_driver_state") select { - case pch := <-j.publishChan: + case pch := <-c.publishChan: defer func() { - j.publishChan <- pch + c.publishChan <- pch }() - q, err := pch.QueueInspect(j.queue) + q, err := pch.QueueInspect(c.queue) if err != nil { return nil, errors.E(op, err) } - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) return &jobState.State{ Pipeline: pipe.Name(), Driver: pipe.Driver(), Queue: q.Name, Active: int64(q.Messages), - Delayed: atomic.LoadInt64(j.delayed), - Ready: ready(atomic.LoadUint32(&j.listeners)), + Delayed: atomic.LoadInt64(c.delayed), + Ready: ready(atomic.LoadUint32(&c.listeners)), }, nil case <-ctx.Done(): @@ -320,37 +322,37 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { } } -func (j *consumer) Pause(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Pause(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested pause on: ", p) + c.log.Error("no such pipeline", "requested pause on: ", p) } - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 0 { - j.log.Warn("no active listeners, nothing to pause") + c.log.Warn("no active listeners, nothing to pause") return } - atomic.AddUint32(&j.listeners, ^uint32(0)) + atomic.AddUint32(&c.listeners, ^uint32(0)) // protect connection (redial) - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() - err := j.consumeChan.Cancel(j.consumeID, true) + err := c.consumeChan.Cancel(c.consumeID, true) if err != nil { - j.log.Error("cancel publish channel, forcing close", "error", err) - errCl := j.consumeChan.Close() + c.log.Error("cancel publish channel, forcing close", "error", err) + errCl := c.consumeChan.Close() if errCl != nil { - j.log.Error("force close failed", "error", err) + c.log.Error("force close failed", "error", err) return } return } - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -358,40 +360,40 @@ func (j *consumer) Pause(_ context.Context, p string) { }) } -func (j *consumer) Resume(_ context.Context, p string) { - pipe := j.pipeline.Load().(*pipeline.Pipeline) +func (c *consumer) Resume(_ context.Context, p string) { + pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { - j.log.Error("no such pipeline", "requested resume on: ", p) + c.log.Error("no such pipeline", "requested resume on: ", p) } // protect connection (redial) - j.Lock() - defer j.Unlock() + c.Lock() + defer c.Unlock() - l := atomic.LoadUint32(&j.listeners) + l := atomic.LoadUint32(&c.listeners) // no active listeners if l == 1 { - j.log.Warn("amqp listener already in the active state") + c.log.Warn("amqp listener already in the active state") return } var err error - j.consumeChan, err = j.conn.Channel() + c.consumeChan, err = c.conn.Channel() if err != nil { - j.log.Error("create channel on rabbitmq connection", "error", err) + c.log.Error("create channel on rabbitmq connection", "error", err) return } - err = j.consumeChan.Qos(j.prefetch, 0, false) + err = c.consumeChan.Qos(c.prefetch, 0, false) if err != nil { - j.log.Error("qos set failed", "error", err) + c.log.Error("qos set failed", "error", err) return } // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, + deliv, err := c.consumeChan.Consume( + c.queue, + c.consumeID, false, false, false, @@ -399,17 +401,17 @@ func (j *consumer) Resume(_ context.Context, p string) { nil, ) if err != nil { - j.log.Error("consume operation failed", "error", err) + c.log.Error("consume operation failed", "error", err) return } // run listener - j.listener(deliv) + c.listener(deliv) // increase number of listeners - atomic.AddUint32(&j.listeners, 1) + atomic.AddUint32(&c.listeners, 1) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -417,11 +419,11 @@ func (j *consumer) Resume(_ context.Context, p string) { }) } -func (j *consumer) Stop(context.Context) error { - j.stopCh <- struct{}{} +func (c *consumer) Stop(context.Context) error { + c.stopCh <- struct{}{} - pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ + pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), @@ -431,13 +433,13 @@ func (j *consumer) Stop(context.Context) error { } // handleItem -func (j *consumer) handleItem(ctx context.Context, msg *Item) error { +func (c *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("rabbitmq_handle_item") select { - case pch := <-j.publishChan: + case pch := <-c.publishChan: // return the channel back defer func() { - j.publishChan <- pch + c.publishChan <- pch }() // convert @@ -449,30 +451,30 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("rabbitmq_handle_item") // handle timeouts if msg.Options.DelayDuration() > 0 { - atomic.AddInt64(j.delayed, 1) + atomic.AddInt64(c.delayed, 1) // TODO declare separate method for this if condition // TODO dlx cache channel?? delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, c.exchangeName, c.queue) _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, + dlx: c.exchangeName, + dlxRoutingKey: c.routingKey, dlxTTL: delayMs, dlxExpires: delayMs * 2, }) if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) + atomic.AddInt64(c.delayed, ^int64(0)) return errors.E(op, err) } - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + err = pch.QueueBind(tmpQ, tmpQ, c.exchangeName, false, nil) if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) + atomic.AddInt64(c.delayed, ^int64(0)) return errors.E(op, err) } // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + err = pch.Publish(c.exchangeName, tmpQ, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, Timestamp: time.Now().UTC(), @@ -481,7 +483,7 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error { }) if err != nil { - atomic.AddInt64(j.delayed, ^int64(0)) + atomic.AddInt64(c.delayed, ^int64(0)) return errors.E(op, err) } @@ -489,7 +491,7 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error { } // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + err = pch.Publish(c.exchangeName, c.routingKey, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, Timestamp: time.Now(), diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go index a8e305ea..66b70a36 100644 --- a/plugins/amqp/amqpjobs/item.go +++ b/plugins/amqp/amqpjobs/item.go @@ -139,9 +139,9 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { } // fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ -func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { +func (c *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { const op = errors.Op("from_delivery_convert") - item, err := j.unpack(d) + item, err := c.unpack(d) if err != nil { return nil, errors.E(op, err) } @@ -156,10 +156,10 @@ func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack - item.Options.delayed = j.delayed + item.Options.delayed = c.delayed // requeue func - item.Options.requeueFn = j.handleItem + item.Options.requeueFn = c.handleItem return i, nil } @@ -194,11 +194,11 @@ func pack(id string, j *Item) (amqp.Table, error) { } // unpack restores jobs.Options -func (j *consumer) unpack(d amqp.Delivery) (*Item, error) { +func (c *consumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ - multipleAsk: j.multipleAck, - requeue: j.requeueOnFail, - requeueFn: j.handleItem, + multipleAsk: c.multipleAck, + requeue: c.requeueOnFail, + requeueFn: c.handleItem, }} if _, ok := d.Headers[job.RRID].(string); !ok { @@ -230,7 +230,7 @@ func (j *consumer) unpack(d amqp.Delivery) (*Item, error) { if _, ok := d.Headers[job.RRPriority]; !ok { // set pipe's priority - item.Options.Priority = j.priority + item.Options.Priority = c.priority } else { item.Options.Priority = d.Headers[job.RRPriority].(int64) } diff --git a/plugins/amqp/amqpjobs/listener.go b/plugins/amqp/amqpjobs/listener.go index 0156d55c..75c61cad 100644 --- a/plugins/amqp/amqpjobs/listener.go +++ b/plugins/amqp/amqpjobs/listener.go @@ -2,23 +2,23 @@ package amqpjobs import amqp "github.com/rabbitmq/amqp091-go" -func (j *consumer) listener(deliv <-chan amqp.Delivery) { +func (c *consumer) listener(deliv <-chan amqp.Delivery) { go func() { for { //nolint:gosimple select { case msg, ok := <-deliv: if !ok { - j.log.Info("delivery channel closed, leaving the rabbit listener") + c.log.Info("delivery channel closed, leaving the rabbit listener") return } - d, err := j.fromDelivery(msg) + d, err := c.fromDelivery(msg) if err != nil { - j.log.Error("amqp delivery convert", "error", err) + c.log.Error("amqp delivery convert", "error", err) continue } // insert job into the main priority queue - j.pq.Insert(d) + c.pq.Insert(d) } } }() diff --git a/plugins/amqp/amqpjobs/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go index e260fabe..fb5f6911 100644 --- a/plugins/amqp/amqpjobs/rabbit_init.go +++ b/plugins/amqp/amqpjobs/rabbit_init.go @@ -4,20 +4,20 @@ import ( "github.com/spiral/errors" ) -func (j *consumer) initRabbitMQ() error { +func (c *consumer) initRabbitMQ() error { const op = errors.Op("jobs_plugin_rmq_init") // Channel opens a unique, concurrent server channel to process the bulk of AMQP // messages. Any error from methods on this receiver will render the receiver // invalid and a new Channel should be opened. - channel, err := j.conn.Channel() + channel, err := c.conn.Channel() if err != nil { return errors.E(op, err) } // declare an exchange (idempotent operation) err = channel.ExchangeDeclare( - j.exchangeName, - j.exchangeType, + c.exchangeName, + c.exchangeType, true, false, false, @@ -30,10 +30,10 @@ func (j *consumer) initRabbitMQ() error { // verify or declare a queue q, err := channel.QueueDeclare( - j.queue, + c.queue, false, false, - j.exclusive, + c.exclusive, false, nil, ) @@ -44,8 +44,8 @@ func (j *consumer) initRabbitMQ() error { // bind queue to the exchange err = channel.QueueBind( q.Name, - j.routingKey, - j.exchangeName, + c.routingKey, + c.exchangeName, false, nil, ) diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 0835e3ea..56142e2b 100644 --- a/plugins/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -11,26 +11,26 @@ import ( ) // redialer used to redial to the rabbitmq in case of the connection interrupts -func (j *consumer) redialer() { //nolint:gocognit +func (c *consumer) redialer() { //nolint:gocognit go func() { const op = errors.Op("rabbitmq_redial") for { select { - case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): + case err := <-c.conn.NotifyClose(make(chan *amqp.Error)): if err == nil { return } - j.Lock() + c.Lock() // trash the broken publishing channel - <-j.publishChan + <-c.publishChan t := time.Now() - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeError, Pipeline: pipe.Name(), Driver: pipe.Driver(), @@ -40,41 +40,41 @@ func (j *consumer) redialer() { //nolint:gocognit expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) - expb.MaxElapsedTime = j.retryTimeout + expb.MaxElapsedTime = c.retryTimeout operation := func() error { - j.log.Warn("rabbitmq reconnecting, caused by", "error", err) + c.log.Warn("rabbitmq reconnecting, caused by", "error", err) var dialErr error - j.conn, dialErr = amqp.Dial(j.connStr) + c.conn, dialErr = amqp.Dial(c.connStr) if dialErr != nil { return errors.E(op, dialErr) } - j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + c.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") // re-init connection - errInit := j.initRabbitMQ() + errInit := c.initRabbitMQ() if errInit != nil { - j.log.Error("rabbitmq dial", "error", errInit) + c.log.Error("rabbitmq dial", "error", errInit) return errInit } // redeclare consume channel var errConnCh error - j.consumeChan, errConnCh = j.conn.Channel() + c.consumeChan, errConnCh = c.conn.Channel() if errConnCh != nil { return errors.E(op, errConnCh) } // redeclare publish channel - pch, errPubCh := j.conn.Channel() + pch, errPubCh := c.conn.Channel() if errPubCh != nil { return errors.E(op, errPubCh) } // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, + deliv, err := c.consumeChan.Consume( + c.queue, + c.consumeID, false, false, false, @@ -86,23 +86,23 @@ func (j *consumer) redialer() { //nolint:gocognit } // put the fresh publishing channel - j.publishChan <- pch + c.publishChan <- pch // restart listener - j.listener(deliv) + c.listener(deliv) - j.log.Info("queues and subscribers redeclared successfully") + c.log.Info("queues and subscribers redeclared successfully") return nil } retryErr := backoff.Retry(operation, expb) if retryErr != nil { - j.Unlock() - j.log.Error("backoff failed", "error", retryErr) + c.Unlock() + c.log.Error("backoff failed", "error", retryErr) return } - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), Driver: pipe.Driver(), @@ -110,27 +110,27 @@ func (j *consumer) redialer() { //nolint:gocognit Elapsed: time.Since(t), }) - j.Unlock() + c.Unlock() - case <-j.stopCh: - if j.publishChan != nil { - pch := <-j.publishChan + case <-c.stopCh: + if c.publishChan != nil { + pch := <-c.publishChan err := pch.Close() if err != nil { - j.log.Error("publish channel close", "error", err) + c.log.Error("publish channel close", "error", err) } } - if j.consumeChan != nil { - err := j.consumeChan.Close() + if c.consumeChan != nil { + err := c.consumeChan.Close() if err != nil { - j.log.Error("consume channel close", "error", err) + c.log.Error("consume channel close", "error", err) } } - if j.conn != nil { - err := j.conn.Close() + if c.conn != nil { + err := c.conn.Close() if err != nil { - j.log.Error("amqp connection close", "error", err) + c.log.Error("amqp connection close", "error", err) } } |