diff options
author | Valery Piashchynski <[email protected]> | 2021-09-02 16:52:54 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-02 16:52:54 +0300 |
commit | 74c327a86e48ccc9d58833fce994ea134169d0a9 (patch) | |
tree | 8e373215eb8dbfbd57c60ca4d4960761537dc4af /plugins/beanstalk | |
parent | 66f069f092568585e7b2a118303a20a598948fd7 (diff) |
Profiling session fixes:
- Drain local pipeline channel
- sync.Map instead of map
- Add start-elapsed timings
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/beanstalk')
-rw-r--r-- | plugins/beanstalk/consumer.go | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/plugins/beanstalk/consumer.go b/plugins/beanstalk/consumer.go index dc2a7e91..30807f03 100644 --- a/plugins/beanstalk/consumer.go +++ b/plugins/beanstalk/consumer.go @@ -266,9 +266,10 @@ func (j *consumer) State(ctx context.Context) (*jobState.State, error) { func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") - // check if the pipeline registered + start := time.Now() // load atomic value + // check if the pipeline registered pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", p.Name(), pipe.Name())) @@ -282,13 +283,15 @@ func (j *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Stop(context.Context) error { + start := time.Now() pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -299,13 +302,15 @@ func (j *consumer) Stop(context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (j *consumer) Pause(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -328,11 +333,13 @@ func (j *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (j *consumer) Resume(_ context.Context, p string) { + start := time.Now() // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -357,7 +364,8 @@ func (j *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } |