diff options
author | Valery Piashchynski <[email protected]> | 2021-07-14 11:35:12 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-14 11:35:12 +0300 |
commit | d099e47ab28dd044d34e18347a4c714b8af3d612 (patch) | |
tree | e106e13bba48e435b87d218237b282d7f691b52c /plugins/jobs/drivers/amqp/redial.go | |
parent | ec7c049036d31fe030d106db9f0d268ea0296c5f (diff) |
SQS driver.
Fix isssues in the AMQP driver.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp/redial.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/redial.go | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go new file mode 100644 index 00000000..0b52a4d1 --- /dev/null +++ b/plugins/jobs/drivers/amqp/redial.go @@ -0,0 +1,126 @@ +package amqp + +import ( + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/streadway/amqp" +) + +// redialer used to redial to the rabbitmq in case of the connection interrupts +func (j *JobsConsumer) redialer() { //nolint:gocognit + go func() { + const op = errors.Op("rabbitmq_redial") + + for { + select { + case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): + if err == nil { + return + } + + j.Lock() + + t := time.Now() + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeError, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Error: err, + Start: time.Now(), + }) + + j.log.Error("connection closed, reconnecting", "error", err) + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = j.retryTimeout + op := func() error { + j.log.Warn("rabbitmq reconnecting, caused by", "error", err) + var dialErr error + j.conn, dialErr = amqp.Dial(j.connStr) + if dialErr != nil { + return errors.E(op, dialErr) + } + + j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + + // re-init connection + errInit := j.initRabbitMQ() + if errInit != nil { + j.log.Error("rabbitmq dial", "error", errInit) + return errInit + } + + // redeclare consume channel + var errConnCh error + j.consumeChan, errConnCh = j.conn.Channel() + if errConnCh != nil { + return errors.E(op, errConnCh) + } + + // redeclare publish channel + var errPubCh error + j.publishChan, errPubCh = j.conn.Channel() + if errPubCh != nil { + return errors.E(op, errPubCh) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // restart listener + j.listener(deliv) + + j.log.Info("queues and subscribers redeclared successfully") + return nil + } + + retryErr := backoff.Retry(op, expb) + if retryErr != nil { + j.Unlock() + j.log.Error("backoff failed", "error", retryErr) + return + } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Start: t, + Elapsed: time.Since(t), + }) + + j.Unlock() + + case <-j.stopCh: + err := j.publishChan.Close() + if err != nil { + j.log.Error("publish channel close", "error", err) + } + err = j.consumeChan.Close() + if err != nil { + j.log.Error("consume channel close", "error", err) + } + err = j.conn.Close() + if err != nil { + j.log.Error("amqp connection close", "error", err) + } + } + } + }() +} |