diff options
author | Valery Piashchynski <[email protected]> | 2021-09-02 20:09:01 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-02 20:09:01 +0300 |
commit | 6749db4a2d39fa70b426bcf50edf66a176c07f57 (patch) | |
tree | f6f92c9f0016f6bcac6a9aa45ccc961eebf90018 /plugins/sqs/consumer.go | |
parent | 0437d1f58514f694ea86e8176e621c009cd510f9 (diff) | |
parent | 4524f8c5af045ed5048250b63b7859eaeb4f24a1 (diff) |
#778: feat(refactor): jobs code adjustingv2.4.0
#778: feat(refactor): jobs code adjusting
Diffstat (limited to 'plugins/sqs/consumer.go')
-rw-r--r-- | plugins/sqs/consumer.go | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/plugins/sqs/consumer.go b/plugins/sqs/consumer.go index dfbda154..92dbd6a8 100644 --- a/plugins/sqs/consumer.go +++ b/plugins/sqs/consumer.go @@ -298,6 +298,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("sqs_run") c.Lock() @@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Stop(context.Context) error { + start := time.Now() if atomic.LoadUint32(&c.listeners) > 0 { c.pauseCh <- struct{}{} } @@ -333,12 +336,14 @@ func (c *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -362,11 +367,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -391,7 +398,8 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } |