diff options
Diffstat (limited to 'plugins/amqp')
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go | 22 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/item.go | 7 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/redial.go | 21 |
3 files changed, 28 insertions, 22 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 784a102c..2ff0a40a 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -242,6 +242,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("rabbit_run") pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -287,7 +288,8 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -323,6 +325,7 @@ func (c *consumer) State(ctx context.Context) (*jobState.State, error) { } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -356,11 +359,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -415,22 +420,25 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Stop(context.Context) error { - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} - } + start := time.Now() + c.stopCh <- struct{}{} pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) + return nil } diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go index 04385afe..b837ff86 100644 --- a/plugins/amqp/amqpjobs/item.go +++ b/plugins/amqp/amqpjobs/item.go @@ -43,17 +43,18 @@ type Options struct { Delay int64 `json:"delay,omitempty"` // private - // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + // ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery ack func(multiply bool) error - // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error + requeueFn func(context.Context, *Item) error + // delayed jobs TODO(rustatian): figure out how to get stats from the DLX delayed *int64 multipleAsk bool requeue bool diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 8d21784f..698a34a6 100644 --- a/plugins/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -113,25 +113,22 @@ func (c *consumer) redialer() { //nolint:gocognit c.Unlock() case <-c.stopCh: - if c.publishChan != nil { - pch := <-c.publishChan - err := pch.Close() - if err != nil { - c.log.Error("publish channel close", "error", err) - } + pch := <-c.publishChan + err := pch.Close() + if err != nil { + c.log.Error("publish channel close", "error", err) } if c.consumeChan != nil { - err := c.consumeChan.Close() + err = c.consumeChan.Close() if err != nil { c.log.Error("consume channel close", "error", err) } } - if c.conn != nil { - err := c.conn.Close() - if err != nil { - c.log.Error("amqp connection close", "error", err) - } + + err = c.conn.Close() + if err != nil { + c.log.Error("amqp connection close", "error", err) } return |