summaryrefslogtreecommitdiff
path: root/plugins/beanstalk/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/beanstalk/consumer.go')
-rw-r--r--plugins/beanstalk/consumer.go18
1 files changed, 13 insertions, 5 deletions
diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
index dc2a7e91..30807f03 100644
--- a/plugins/beanstalk/consumer.go
+++ b/plugins/beanstalk/consumer.go
@@ -266,9 +266,10 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) {
func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
- // check if the pipeline registered
+ start := time.Now()
// load atomic value
+ // check if the pipeline registered
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p.Name() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name()))
@@ -282,13 +283,15 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (j *consumer) Stop(context.Context) error {
+ start := time.Now()
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if atomic.LoadUint32(&j.listeners) == 1 {
@@ -299,13 +302,15 @@ func (j *consumer) Stop(context.Context) error {
Event: events.EventPipeStopped,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
return nil
}
func (j *consumer) Pause(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -328,11 +333,13 @@ func (j *consumer) Pause(_ context.Context, p string) {
Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}
func (j *consumer) Resume(_ context.Context, p string) {
+ start := time.Now()
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -357,7 +364,8 @@ func (j *consumer) Resume(_ context.Context, p string) {
Event: events.EventPipeActive,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
- Start: time.Now(),
+ Start: start,
+ Elapsed: time.Since(start),
})
}