summaryrefslogtreecommitdiff
path: root/plugins/sqs/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/sqs/consumer.go')
-rw-r--r--plugins/sqs/consumer.go16
1 files changed, 12 insertions, 4 deletions
diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go
index dfbda154..92dbd6a8 100644
--- a/plugins/sqs/consumer.go
+++ b/plugins/sqs/consumer.go
@@ -298,6 +298,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error {
}
func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
+ start := time.Now()
const op = errors.Op("sqs_run")
c.Lock()
@@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Stop(context.Context) error {
+ start := time.Now()
if atomic.LoadUint32(&c.listeners) > 0 {
c.pauseCh <- struct{}{}
}
@@ -333,12 +336,14 @@ func (c *consumer) Stop(context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (c *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -362,11 +367,13 @@ func (c *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (c *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := c.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -391,7 +398,8 @@ func (c *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}