summaryrefslogtreecommitdiff
path: root/plugins/amqp/amqpjobs/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/amqp/amqpjobs/consumer.go')
-rw-r--r--plugins/amqp/amqpjobs/consumer.go146
1 files changed, 74 insertions, 72 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 1931ceaa..f1b4d54f 100644
--- a/plugins/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
@@ -218,17 +218,17 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return jb, nil
}
-func (j *consumer) Push(ctx context.Context, job *job.Job) error {
+func (c *consumer) Push(ctx context.Context, job *job.Job) error {
const op = errors.Op("rabbitmq_push")
// check if the pipeline registered
// load atomic value
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != job.Options.Pipeline {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name()))
}
- err := j.handleItem(ctx, fromJob(job))
+ err := c.handleItem(ctx, fromJob(job))
if err != nil {
return errors.E(op, err)
}
@@ -236,38 +236,38 @@ func (j *consumer) Push(ctx context.Context, job *job.Job) error {
return nil
}
-func (j *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
- j.pipeline.Store(p)
+func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ c.pipeline.Store(p)
return nil
}
-func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
- const op = errors.Op("rabbit_consume")
+func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ const op = errors.Op("rabbit_run")
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}
// protect connection (redial)
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
var err error
- j.consumeChan, err = j.conn.Channel()
+ c.consumeChan, err = c.conn.Channel()
if err != nil {
return errors.E(op, err)
}
- err = j.consumeChan.Qos(j.prefetch, 0, false)
+ err = c.consumeChan.Qos(c.prefetch, 0, false)
if err != nil {
return errors.E(op, err)
}
// start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
false,
false,
false,
@@ -279,9 +279,11 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
}
// run listener
- j.listener(deliv)
+ c.listener(deliv)
- j.eh.Push(events.JobEvent{
+ atomic.StoreUint32(&c.listeners, 1)
+
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -291,28 +293,28 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
return nil
}
-func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
+func (c *consumer) State(ctx context.Context) (*jobState.State, error) {
const op = errors.Op("amqp_driver_state")
select {
- case pch := <-j.publishChan:
+ case pch := <-c.publishChan:
defer func() {
- j.publishChan <- pch
+ c.publishChan <- pch
}()
- q, err := pch.QueueInspect(j.queue)
+ q, err := pch.QueueInspect(c.queue)
if err != nil {
return nil, errors.E(op, err)
}
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
return &jobState.State{
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Queue: q.Name,
Active: int64(q.Messages),
- Delayed: atomic.LoadInt64(j.delayed),
- Ready: ready(atomic.LoadUint32(&j.listeners)),
+ Delayed: atomic.LoadInt64(c.delayed),
+ Ready: ready(atomic.LoadUint32(&c.listeners)),
}, nil
case <-ctx.Done():
@@ -320,37 +322,37 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
}
}
-func (j *consumer) Pause(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Pause(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested pause on: ", p)
+ c.log.Error("no such pipeline", "requested pause on: ", p)
}
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 0 {
- j.log.Warn("no active listeners, nothing to pause")
+ c.log.Warn("no active listeners, nothing to pause")
return
}
- atomic.AddUint32(&j.listeners, ^uint32(0))
+ atomic.AddUint32(&c.listeners, ^uint32(0))
// protect connection (redial)
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
- err := j.consumeChan.Cancel(j.consumeID, true)
+ err := c.consumeChan.Cancel(c.consumeID, true)
if err != nil {
- j.log.Error("cancel publish channel, forcing close", "error", err)
- errCl := j.consumeChan.Close()
+ c.log.Error("cancel publish channel, forcing close", "error", err)
+ errCl := c.consumeChan.Close()
if errCl != nil {
- j.log.Error("force close failed", "error", err)
+ c.log.Error("force close failed", "error", err)
return
}
return
}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -358,40 +360,40 @@ func (j *consumer) Pause(_ context.Context, p string) {
})
}
-func (j *consumer) Resume(_ context.Context, p string) {
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+func (c *consumer) Resume(_ context.Context, p string) {
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
- j.log.Error("no such pipeline", "requested resume on: ", p)
+ c.log.Error("no such pipeline", "requested resume on: ", p)
}
// protect connection (redial)
- j.Lock()
- defer j.Unlock()
+ c.Lock()
+ defer c.Unlock()
- l := atomic.LoadUint32(&j.listeners)
+ l := atomic.LoadUint32(&c.listeners)
// no active listeners
if l == 1 {
- j.log.Warn("amqp listener already in the active state")
+ c.log.Warn("amqp listener already in the active state")
return
}
var err error
- j.consumeChan, err = j.conn.Channel()
+ c.consumeChan, err = c.conn.Channel()
if err != nil {
- j.log.Error("create channel on rabbitmq connection", "error", err)
+ c.log.Error("create channel on rabbitmq connection", "error", err)
return
}
- err = j.consumeChan.Qos(j.prefetch, 0, false)
+ err = c.consumeChan.Qos(c.prefetch, 0, false)
if err != nil {
- j.log.Error("qos set failed", "error", err)
+ c.log.Error("qos set failed", "error", err)
return
}
// start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
false,
false,
false,
@@ -399,17 +401,17 @@ func (j *consumer) Resume(_ context.Context, p string) {
nil,
)
if err != nil {
- j.log.Error("consume operation failed", "error", err)
+ c.log.Error("consume operation failed", "error", err)
return
}
// run listener
- j.listener(deliv)
+ c.listener(deliv)
// increase number of listeners
- atomic.AddUint32(&j.listeners, 1)
+ atomic.AddUint32(&c.listeners, 1)
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -417,11 +419,11 @@ func (j *consumer) Resume(_ context.Context, p string) {
})
}
-func (j *consumer) Stop(context.Context) error {
- j.stopCh <- struct{}{}
+func (c *consumer) Stop(context.Context) error {
+ c.stopCh <- struct{}{}
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
@@ -431,13 +433,13 @@ func (j *consumer) Stop(context.Context) error {
}
// handleItem
-func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
+func (c *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("rabbitmq_handle_item")
select {
- case pch := <-j.publishChan:
+ case pch := <-c.publishChan:
// return the channel back
defer func() {
- j.publishChan <- pch
+ c.publishChan <- pch
}()
// convert
@@ -449,30 +451,30 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("rabbitmq_handle_item")
// handle timeouts
if msg.Options.DelayDuration() > 0 {
- atomic.AddInt64(j.delayed, 1)
+ atomic.AddInt64(c.delayed, 1)
// TODO declare separate method for this if condition
// TODO dlx cache channel??
delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, c.exchangeName, c.queue)
_, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
+ dlx: c.exchangeName,
+ dlxRoutingKey: c.routingKey,
dlxTTL: delayMs,
dlxExpires: delayMs * 2,
})
if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
+ atomic.AddInt64(c.delayed, ^int64(0))
return errors.E(op, err)
}
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ err = pch.QueueBind(tmpQ, tmpQ, c.exchangeName, false, nil)
if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
+ atomic.AddInt64(c.delayed, ^int64(0))
return errors.E(op, err)
}
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ err = pch.Publish(c.exchangeName, tmpQ, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
Timestamp: time.Now().UTC(),
@@ -481,7 +483,7 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
})
if err != nil {
- atomic.AddInt64(j.delayed, ^int64(0))
+ atomic.AddInt64(c.delayed, ^int64(0))
return errors.E(op, err)
}
@@ -489,7 +491,7 @@ func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
}
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ err = pch.Publish(c.exchangeName, c.routingKey, false, false, amqp.Publishing{
Headers: table,
ContentType: contentType,
Timestamp: time.Now(),