From d62acca114a9646afed6ec0217b8cb709687aeb9 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 1 Sep 2021 16:50:41 +0300 Subject: Close connection in the amqp driver. bytes.Buffer update in the beanstalk driver Signed-off-by: Valery Piashchynski --- plugins/amqp/amqpjobs/consumer.go | 6 +++--- plugins/amqp/amqpjobs/item.go | 7 ++++--- plugins/amqp/amqpjobs/redial.go | 21 +++++++++------------ 3 files changed, 16 insertions(+), 18 deletions(-) (limited to 'plugins/amqp') diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 784a102c..1bfc4b41 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -420,17 +420,17 @@ func (c *consumer) Resume(_ context.Context, p string) { } func (c *consumer) Stop(context.Context) error { - if atomic.LoadUint32(&c.listeners) > 0 { - c.stopCh <- struct{}{} - } + c.stopCh <- struct{}{} pipe := c.pipeline.Load().(*pipeline.Pipeline) + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), }) + return nil } diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go index 04385afe..b837ff86 100644 --- a/plugins/amqp/amqpjobs/item.go +++ b/plugins/amqp/amqpjobs/item.go @@ -43,17 +43,18 @@ type Options struct { Delay int64 `json:"delay,omitempty"` // private - // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + // ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery ack func(multiply bool) error - // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error // requeueFn used as a pointer to the push function - requeueFn func(context.Context, *Item) error + requeueFn func(context.Context, *Item) error + // delayed jobs TODO(rustatian): figure out how to get stats from the DLX delayed *int64 multipleAsk bool requeue bool diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 8d21784f..698a34a6 100644 --- a/plugins/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -113,25 +113,22 @@ func (c *consumer) redialer() { //nolint:gocognit c.Unlock() case <-c.stopCh: - if c.publishChan != nil { - pch := <-c.publishChan - err := pch.Close() - if err != nil { - c.log.Error("publish channel close", "error", err) - } + pch := <-c.publishChan + err := pch.Close() + if err != nil { + c.log.Error("publish channel close", "error", err) } if c.consumeChan != nil { - err := c.consumeChan.Close() + err = c.consumeChan.Close() if err != nil { c.log.Error("consume channel close", "error", err) } } - if c.conn != nil { - err := c.conn.Close() - if err != nil { - c.log.Error("amqp connection close", "error", err) - } + + err = c.conn.Close() + if err != nil { + c.log.Error("amqp connection close", "error", err) } return -- cgit v1.2.3 From 74c327a86e48ccc9d58833fce994ea134169d0a9 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 2 Sep 2021 16:52:54 +0300 Subject: Profiling session fixes: - Drain local pipeline channel - sync.Map instead of map - Add start-elapsed timings Signed-off-by: Valery Piashchynski --- plugins/amqp/amqpjobs/consumer.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) (limited to 'plugins/amqp') diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 1bfc4b41..2ff0a40a 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -242,6 +242,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("rabbit_run") pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -287,7 +288,8 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -323,6 +325,7 @@ func (c *consumer) State(ctx context.Context) (*jobState.State, error) { } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -356,11 +359,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -415,11 +420,13 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Stop(context.Context) error { + start := time.Now() c.stopCh <- struct{}{} pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -428,7 +435,8 @@ func (c *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil -- cgit v1.2.3