diff options
Diffstat (limited to 'plugins/boltdb/boltjobs/consumer.go')
-rw-r--r-- | plugins/boltdb/boltjobs/consumer.go | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/plugins/boltdb/boltjobs/consumer.go b/plugins/boltdb/boltjobs/consumer.go index ed0eda61..62045d3b 100644 --- a/plugins/boltdb/boltjobs/consumer.go +++ b/plugins/boltdb/boltjobs/consumer.go @@ -222,7 +222,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return &consumer{ file: pipeline.String(file, rrDB), priority: pipeline.Int(priority, 10), - prefetch: pipeline.Int(prefetch, 100), + prefetch: pipeline.Int(prefetch, 1000), permissions: conf.Permissions, bPool: sync.Pool{New: func() interface{} { @@ -300,6 +300,7 @@ func (c *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) erro func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("boltdb_run") + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p.Name() { @@ -317,13 +318,15 @@ func (c *consumer) Run(_ context.Context, p *pipeline.Pipeline) error { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Stop(_ context.Context) error { + start := time.Now() if atomic.LoadUint32(&c.listeners) > 0 { c.stopCh <- struct{}{} c.stopCh <- struct{}{} @@ -334,12 +337,14 @@ func (c *consumer) Stop(_ context.Context) error { Event: events.EventPipeStopped, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) return nil } func (c *consumer) Pause(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested pause on: ", p) @@ -361,11 +366,13 @@ func (c *consumer) Pause(_ context.Context, p string) { Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } func (c *consumer) Resume(_ context.Context, p string) { + start := time.Now() pipe := c.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { c.log.Error("no such pipeline", "requested resume on: ", p) @@ -389,7 +396,8 @@ func (c *consumer) Resume(_ context.Context, p string) { Event: events.EventPipeActive, Driver: pipe.Driver(), Pipeline: pipe.Name(), - Start: time.Now(), + Start: start, + Elapsed: time.Since(start), }) } |