summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/redial.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/brokers/amqp/redial.go')
-rw-r--r--plugins/jobs/brokers/amqp/redial.go22
1 files changed, 22 insertions, 0 deletions
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go
index 277e75b7..571ee548 100644
--- a/plugins/jobs/brokers/amqp/redial.go
+++ b/plugins/jobs/brokers/amqp/redial.go
@@ -2,9 +2,12 @@ package amqp
import (
"fmt"
+ "time"
"github.com/cenkalti/backoff/v4"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/streadway/amqp"
)
@@ -22,6 +25,17 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
j.Lock()
+ t := time.Now()
+ pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeError,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Error: err,
+ Start: time.Now(),
+ Elapsed: 0,
+ })
+
j.log.Error("connection closed, reconnecting", "error", err)
expb := backoff.NewExponentialBackOff()
// set the retry timeout (minutes)
@@ -85,6 +99,14 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
return
}
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeActive,
+ Pipeline: pipe.Name(),
+ Driver: pipe.Driver(),
+ Start: t,
+ Elapsed: time.Since(t),
+ })
+
j.Unlock()
case <-j.stopCh: