diff options
Diffstat (limited to 'plugins/memory/memoryjobs/consumer.go')
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index c2cc303b..fbdedefe 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -62,7 +62,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100 + jb.cfg.Prefetch = 100_000 } // initialize a local queue @@ -76,7 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand log: log, pq: pq, eh: eh, - localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100)), + localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)), goroutines: 0, active: utils.Int64(0), delayed: utils.Int64(0), @@ -199,6 +199,8 @@ func (c *consumer) Stop(_ context.Context) error { <-c.localPrefetch } + c.localPrefetch = nil + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Pipeline: pipe.Name(), |