diff options
Diffstat (limited to 'plugins/amqp/amqpjobs')
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/plugins/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 1bfc4b41..2ff0a40a 100644 --- a/plugins/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go @@ -242,6 +242,7 @@ func (c *consumer) Register(_ context.Context, p *pipeline.Pipeline) error { } func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { + start := time.Now() const op = errors.Op("rabbit_run") pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -287,7 +288,8 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil @@ -323,6 +325,7 @@ func (c *consumer) State(ctx context.Context) (*jobState.State, error) { } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -356,11 +359,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -415,11 +420,13 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Stop(context.Context) error { + start := time.Now() c.stopCh <- struct{}{} pipe := c.pipeline.Load().(*pipeline.Pipeline) @@ -428,7 +435,8 @@ func (c *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil |