diff options
Diffstat (limited to 'plugins/jobs/brokers/amqp/redial.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/redial.go | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go index 277e75b7..571ee548 100644 --- a/plugins/jobs/brokers/amqp/redial.go +++ b/plugins/jobs/brokers/amqp/redial.go @@ -2,9 +2,12 @@ package amqp import ( "fmt" + "time" "github.com/cenkalti/backoff/v4" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/streadway/amqp" ) @@ -22,6 +25,17 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit j.Lock() + t := time.Now() + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeError, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Error: err, + Start: time.Now(), + Elapsed: 0, + }) + j.log.Error("connection closed, reconnecting", "error", err) expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) @@ -85,6 +99,14 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit return } + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Start: t, + Elapsed: time.Since(t), + }) + j.Unlock() case <-j.stopCh: |