diff options
author | Valery Piashchynski <[email protected]> | 2021-07-14 11:35:12 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-14 11:35:12 +0300 |
commit | d099e47ab28dd044d34e18347a4c714b8af3d612 (patch) | |
tree | e106e13bba48e435b87d218237b282d7f691b52c /plugins/jobs/drivers/amqp | |
parent | ec7c049036d31fe030d106db9f0d268ea0296c5f (diff) |
SQS driver.
Fix isssues in the AMQP driver.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp')
-rw-r--r-- | plugins/jobs/drivers/amqp/config.go | 58 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 416 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 187 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/listener.go | 25 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/plugin.go | 40 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/rabbit_init.go | 65 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/redial.go | 126 |
7 files changed, 917 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go new file mode 100644 index 00000000..7befb3c8 --- /dev/null +++ b/plugins/jobs/drivers/amqp/config.go @@ -0,0 +1,58 @@ +package amqp + +// pipeline rabbitmq info +const ( + exchangeKey string = "exchange" + exchangeType string = "exchange-type" + queue string = "queue" + routingKey string = "routing-key" + prefetch string = "prefetch" + exclusive string = "exclusive" + priority string = "priority" + + dlx string = "x-dead-letter-exchange" + dlxRoutingKey string = "x-dead-letter-routing-key" + dlxTTL string = "x-message-ttl" + dlxExpires string = "x-expires" + + contentType string = "application/octet-stream" +) + +type GlobalCfg struct { + Addr string `mapstructure:"addr"` +} + +// Config is used to parse pipeline configuration +type Config struct { + PrefetchCount int `mapstructure:"pipeline_size"` + Queue string `mapstructure:"queue"` + Priority int64 `mapstructure:"priority"` + Exchange string `mapstructure:"exchange"` + ExchangeType string `mapstructure:"exchange_type"` + RoutingKey string `mapstructure:"routing_key"` + Exclusive bool `mapstructure:"exclusive"` +} + +func (c *Config) InitDefault() { + if c.ExchangeType == "" { + c.ExchangeType = "direct" + } + + if c.Exchange == "" { + c.Exchange = "default" + } + + if c.PrefetchCount == 0 { + c.PrefetchCount = 100 + } + + if c.Priority == 0 { + c.Priority = 10 + } +} + +func (c *GlobalCfg) InitDefault() { + if c.Addr == "" { + c.Addr = "amqp://guest:guest@localhost:5672/" + } +} diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go new file mode 100644 index 00000000..31999e23 --- /dev/null +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -0,0 +1,416 @@ +package amqp + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/streadway/amqp" +) + +type JobsConsumer struct { + sync.Mutex + log logger.Logger + pq priorityqueue.Queue + eh events.Handler + + pipeline atomic.Value + + // amqp connection + conn *amqp.Connection + consumeChan *amqp.Channel + publishChan *amqp.Channel + + retryTimeout time.Duration + prefetchCount int + priority int64 + exchangeName string + queue string + exclusive bool + consumeID string + connStr string + exchangeType string + routingKey string + + delayCache map[string]struct{} + + stopCh chan struct{} +} + +// NewAMQPConsumer initializes rabbitmq pipeline +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) { + const op = errors.Op("new_amqp_consumer") + // we need to obtain two parts of the amqp information here. + // firs part - address to connect, it is located in the global section under the amqp pluginName + // second part - queues and other pipeline information + jb := &JobsConsumer{ + log: log, + pq: pq, + eh: e, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + // TODO to config + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + } + + // if no such key - error + if !cfg.Has(configKey) { + return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey)) + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) + } + + // PARSE CONFIGURATION ------- + var pipeCfg Config + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(configKey, &pipeCfg) + if err != nil { + return nil, errors.E(op, err) + } + + pipeCfg.InitDefault() + + err = cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + jb.routingKey = pipeCfg.RoutingKey + jb.queue = pipeCfg.Queue + jb.exchangeType = pipeCfg.ExchangeType + jb.exchangeName = pipeCfg.Exchange + jb.prefetchCount = pipeCfg.PrefetchCount + jb.exclusive = pipeCfg.Exclusive + jb.priority = pipeCfg.Priority + + // PARSE CONFIGURATION ------- + + jb.conn, err = amqp.Dial(globalCfg.Addr) + if err != nil { + return nil, errors.E(op, err) + } + + // save address + jb.connStr = globalCfg.Addr + + err = jb.initRabbitMQ() + if err != nil { + return nil, errors.E(op, err) + } + + jb.publishChan, err = jb.conn.Channel() + if err != nil { + return nil, errors.E(op, err) + } + + // run redialer for the connection + jb.redialer() + + return jb, nil +} + +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobsConsumer, error) { + const op = errors.Op("new_amqp_consumer_from_pipeline") + // we need to obtain two parts of the amqp information here. + // firs part - address to connect, it is located in the global section under the amqp pluginName + // second part - queues and other pipeline information + jb := &JobsConsumer{ + log: log, + eh: e, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) + } + + // PARSE CONFIGURATION ------- + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + jb.routingKey = pipeline.String(routingKey, "") + jb.queue = pipeline.String(queue, "default") + jb.exchangeType = pipeline.String(exchangeType, "direct") + jb.exchangeName = pipeline.String(exchangeKey, "amqp.default") + jb.prefetchCount = pipeline.Int(prefetch, 10) + jb.priority = int64(pipeline.Int(priority, 10)) + jb.exclusive = pipeline.Bool(exclusive, true) + + // PARSE CONFIGURATION ------- + + jb.conn, err = amqp.Dial(globalCfg.Addr) + if err != nil { + return nil, errors.E(op, err) + } + + // save address + jb.connStr = globalCfg.Addr + + err = jb.initRabbitMQ() + if err != nil { + return nil, errors.E(op, err) + } + + jb.publishChan, err = jb.conn.Channel() + if err != nil { + return nil, errors.E(op, err) + } + + // register the pipeline + // error here is always nil + _ = jb.Register(pipeline) + + // run redialer for the connection + jb.redialer() + + return jb, nil +} + +func (j *JobsConsumer) Push(job *job.Job) error { + const op = errors.Op("rabbitmq_push") + // check if the pipeline registered + + // load atomic value + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != job.Options.Pipeline { + return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) + } + + // lock needed here to protect redial concurrent operation + // we may be in the redial state here + j.Lock() + defer j.Unlock() + + // convert + msg := fromJob(job) + p, err := pack(job.Ident, msg) + if err != nil { + return errors.E(op, err) + } + + // handle timeouts + if msg.Options.DelayDuration() > 0 { + // TODO declare separate method for this if condition + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + + // delay cache optimization. + // If user already declared a queue with a delay, do not redeclare and rebind the queue + // Before -> 2.5k RPS with redeclaration + // After -> 30k RPS + if _, exists := j.delayCache[tmpQ]; exists { + // insert to the local, limited pipeline + err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: p, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + return nil + } + + _, err = j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + + if err != nil { + return errors.E(op, err) + } + + err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + return errors.E(op, err) + } + + // insert to the local, limited pipeline + err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: p, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + j.delayCache[tmpQ] = struct{}{} + + return nil + } + + // insert to the local, limited pipeline + err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: p, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { + j.pipeline.Store(pipeline) + return nil +} + +func (j *JobsConsumer) Run(p *pipeline.Pipeline) error { + const op = errors.Op("rabbit_consume") + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p.Name() { + return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name())) + } + + // protect connection (redial) + j.Lock() + defer j.Unlock() + + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + return errors.E(op, err) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // run listener + j.listener(deliv) + + return nil +} + +func (j *JobsConsumer) Pause(p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested pause on: ", p) + } + + // protect connection (redial) + j.Lock() + defer j.Unlock() + + err := j.consumeChan.Cancel(j.consumeID, true) + if err != nil { + j.log.Error("cancel publish channel, forcing close", "error", err) + errCl := j.consumeChan.Close() + if errCl != nil { + j.log.Error("force close failed", "error", err) + } + } +} + +func (j *JobsConsumer) Resume(p string) { + pipe := j.pipeline.Load().(*pipeline.Pipeline) + if pipe.Name() != p { + j.log.Error("no such pipeline", "requested resume on: ", p) + } + + // protect connection (redial) + j.Lock() + defer j.Unlock() + + var err error + j.consumeChan, err = j.conn.Channel() + if err != nil { + j.log.Error("create channel on rabbitmq connection", "error", err) + return + } + + err = j.consumeChan.Qos(j.prefetchCount, 0, false) + if err != nil { + j.log.Error("qos set failed", "error", err) + return + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + j.log.Error("consume operation failed", "error", err) + return + } + + // run listener + j.listener(deliv) +} + +func (j *JobsConsumer) Stop() error { + j.stopCh <- struct{}{} + + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + Elapsed: 0, + }) + return nil +} diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go new file mode 100644 index 00000000..7c300c88 --- /dev/null +++ b/plugins/jobs/drivers/amqp/item.go @@ -0,0 +1,187 @@ +package amqp + +import ( + "time" + + json "github.com/json-iterator/go" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/jobs/job" + "github.com/spiral/roadrunner/v2/utils" + "github.com/streadway/amqp" +) + +type Item struct { + // Job contains pluginName of job broker (usually PHP class). + Job string `json:"job"` + + // Ident is unique identifier of the job, should be provided from outside + Ident string `json:"id"` + + // Payload is string data (usually JSON) passed to Job broker. + Payload string `json:"payload"` + + // Headers with key-values pairs + Headers map[string][]string `json:"headers"` + + // Options contains set of PipelineOptions specific to job execution. Can be empty. + Options *Options `json:"options,omitempty"` + + // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + AckFunc func(multiply bool) error + + // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. + // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. + // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time + NackFunc func(multiply bool, requeue bool) error +} + +// Options carry information about how to handle given job. +type Options struct { + // Priority is job priority, default - 10 + // pointer to distinguish 0 as a priority and nil as priority not set + Priority int64 `json:"priority"` + + // Pipeline manually specified pipeline. + Pipeline string `json:"pipeline,omitempty"` + + // Delay defines time duration to delay execution for. Defaults to none. + Delay int64 `json:"delay,omitempty"` + + // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + Timeout int64 `json:"timeout,omitempty"` +} + +// DelayDuration returns delay duration in a form of time.Duration. +func (o *Options) DelayDuration() time.Duration { + return time.Second * time.Duration(o.Delay) +} + +// TimeoutDuration returns timeout duration in a form of time.Duration. +func (o *Options) TimeoutDuration() time.Duration { + if o.Timeout == 0 { + return 30 * time.Minute + } + + return time.Second * time.Duration(o.Timeout) +} + +func (j *Item) ID() string { + return j.Ident +} + +func (j *Item) Priority() int64 { + return j.Options.Priority +} + +// Body packs job payload into binary payload. +func (j *Item) Body() []byte { + return utils.AsBytes(j.Payload) +} + +// Context packs job context (job, id) into binary payload. +// Not used in the amqp, amqp.Table used instead +func (j *Item) Context() ([]byte, error) { + return nil, nil +} + +func (j *Item) Ack() error { + return j.AckFunc(false) +} + +func (j *Item) Nack() error { + return j.NackFunc(false, false) +} + +func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { + const op = errors.Op("from_delivery_convert") + item, err := j.unpack(d) + if err != nil { + return nil, errors.E(op, err) + } + return &Item{ + Job: item.Job, + Ident: item.Ident, + Payload: item.Payload, + Headers: item.Headers, + Options: item.Options, + AckFunc: d.Ack, + NackFunc: d.Nack, + }, nil +} + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, + }, + } +} + +// pack job metadata into headers +func pack(id string, j *Item) (amqp.Table, error) { + headers, err := json.Marshal(j.Headers) + if err != nil { + return nil, err + } + return amqp.Table{ + job.RRID: id, + job.RRJob: j.Job, + job.RRPipeline: j.Options.Pipeline, + job.RRHeaders: headers, + job.RRTimeout: j.Options.Timeout, + job.RRDelay: j.Options.Delay, + job.RRPriority: j.Options.Priority, + }, nil +} + +// unpack restores jobs.Options +func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) { + item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}} + + if _, ok := d.Headers[job.RRID].(string); !ok { + return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID)) + } + + item.Ident = d.Headers[job.RRID].(string) + + if _, ok := d.Headers[job.RRJob].(string); !ok { + return nil, errors.E(errors.Errorf("missing header `%s`", job.RRJob)) + } + + item.Job = d.Headers[job.RRJob].(string) + + if _, ok := d.Headers[job.RRPipeline].(string); ok { + item.Options.Pipeline = d.Headers[job.RRPipeline].(string) + } + + if h, ok := d.Headers[job.RRHeaders].([]byte); ok { + err := json.Unmarshal(h, &item.Headers) + if err != nil { + return nil, err + } + } + + if _, ok := d.Headers[job.RRTimeout].(int64); ok { + item.Options.Timeout = d.Headers[job.RRTimeout].(int64) + } + + if _, ok := d.Headers[job.RRDelay].(int64); ok { + item.Options.Delay = d.Headers[job.RRDelay].(int64) + } + + if _, ok := d.Headers[job.RRPriority]; !ok { + // set pipe's priority + item.Options.Priority = j.priority + } else { + item.Options.Priority = d.Headers[job.RRPriority].(int64) + } + + return item, nil +} diff --git a/plugins/jobs/drivers/amqp/listener.go b/plugins/jobs/drivers/amqp/listener.go new file mode 100644 index 00000000..7241c717 --- /dev/null +++ b/plugins/jobs/drivers/amqp/listener.go @@ -0,0 +1,25 @@ +package amqp + +import "github.com/streadway/amqp" + +func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { + go func() { + for { //nolint:gosimple + select { + case msg, ok := <-deliv: + if !ok { + j.log.Info("delivery channel closed, leaving the rabbit listener") + return + } + + d, err := j.fromDelivery(msg) + if err != nil { + j.log.Error("amqp delivery convert", "error", err) + continue + } + // insert job into the main priority queue + j.pq.Insert(d) + } + } + }() +} diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/jobs/drivers/amqp/plugin.go new file mode 100644 index 00000000..624f4405 --- /dev/null +++ b/plugins/jobs/drivers/amqp/plugin.go @@ -0,0 +1,40 @@ +package amqp + +import ( + "github.com/spiral/roadrunner/v2/common/jobs" + "github.com/spiral/roadrunner/v2/pkg/events" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + pluginName string = "amqp" +) + +type Plugin struct { + log logger.Logger + cfg config.Configurer +} + +func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + p.log = log + p.cfg = cfg + return nil +} + +func (p *Plugin) Name() string { + return pluginName +} + +func (p *Plugin) Available() {} + +func (p *Plugin) JobsConstruct(configKey string, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return NewAMQPConsumer(configKey, p.log, p.cfg, e, pq) +} + +// FromPipeline constructs AMQP driver from pipeline +func (p *Plugin) FromPipeline(pipe *pipeline.Pipeline, e events.Handler, pq priorityqueue.Queue) (jobs.Consumer, error) { + return FromPipeline(pipe, p.log, p.cfg, e, pq) +} diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/rabbit_init.go new file mode 100644 index 00000000..d6b8a708 --- /dev/null +++ b/plugins/jobs/drivers/amqp/rabbit_init.go @@ -0,0 +1,65 @@ +package amqp + +import ( + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" +) + +func (j *JobsConsumer) initRabbitMQ() error { + const op = errors.Op("jobs_plugin_rmq_init") + // Channel opens a unique, concurrent server channel to process the bulk of AMQP + // messages. Any error from methods on this receiver will render the receiver + // invalid and a new Channel should be opened. + channel, err := j.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + // declare an exchange (idempotent operation) + err = channel.ExchangeDeclare( + j.exchangeName, + j.exchangeType, + true, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // verify or declare a queue + q, err := channel.QueueDeclare( + j.queue, + false, + false, + j.exclusive, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // bind queue to the exchange + err = channel.QueueBind( + q.Name, + j.routingKey, + j.exchangeName, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + j.eh.Push(events.JobEvent{ + Event: events.EventInitialized, + Driver: "amqp", + Start: time.Now(), + }) + return channel.Close() +} diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go new file mode 100644 index 00000000..0b52a4d1 --- /dev/null +++ b/plugins/jobs/drivers/amqp/redial.go @@ -0,0 +1,126 @@ +package amqp + +import ( + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" + "github.com/streadway/amqp" +) + +// redialer used to redial to the rabbitmq in case of the connection interrupts +func (j *JobsConsumer) redialer() { //nolint:gocognit + go func() { + const op = errors.Op("rabbitmq_redial") + + for { + select { + case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): + if err == nil { + return + } + + j.Lock() + + t := time.Now() + pipe := j.pipeline.Load().(*pipeline.Pipeline) + j.eh.Push(events.JobEvent{ + Event: events.EventPipeError, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Error: err, + Start: time.Now(), + }) + + j.log.Error("connection closed, reconnecting", "error", err) + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = j.retryTimeout + op := func() error { + j.log.Warn("rabbitmq reconnecting, caused by", "error", err) + var dialErr error + j.conn, dialErr = amqp.Dial(j.connStr) + if dialErr != nil { + return errors.E(op, dialErr) + } + + j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + + // re-init connection + errInit := j.initRabbitMQ() + if errInit != nil { + j.log.Error("rabbitmq dial", "error", errInit) + return errInit + } + + // redeclare consume channel + var errConnCh error + j.consumeChan, errConnCh = j.conn.Channel() + if errConnCh != nil { + return errors.E(op, errConnCh) + } + + // redeclare publish channel + var errPubCh error + j.publishChan, errPubCh = j.conn.Channel() + if errPubCh != nil { + return errors.E(op, errPubCh) + } + + // start reading messages from the channel + deliv, err := j.consumeChan.Consume( + j.queue, + j.consumeID, + false, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // restart listener + j.listener(deliv) + + j.log.Info("queues and subscribers redeclared successfully") + return nil + } + + retryErr := backoff.Retry(op, expb) + if retryErr != nil { + j.Unlock() + j.log.Error("backoff failed", "error", retryErr) + return + } + + j.eh.Push(events.JobEvent{ + Event: events.EventPipeActive, + Pipeline: pipe.Name(), + Driver: pipe.Driver(), + Start: t, + Elapsed: time.Since(t), + }) + + j.Unlock() + + case <-j.stopCh: + err := j.publishChan.Close() + if err != nil { + j.log.Error("publish channel close", "error", err) + } + err = j.consumeChan.Close() + if err != nil { + j.log.Error("consume channel close", "error", err) + } + err = j.conn.Close() + if err != nil { + j.log.Error("amqp connection close", "error", err) + } + } + } + }() +} |