diff options
Diffstat (limited to 'plugins/memory/memoryjobs')
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 6 | ||||
-rw-r--r-- | plugins/memory/memoryjobs/item.go | 1 |
2 files changed, 5 insertions, 2 deletions
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index c2cc303b..fbdedefe 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -62,7 +62,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100 + jb.cfg.Prefetch = 100_000 } // initialize a local queue @@ -76,7 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand log: log, pq: pq, eh: eh, - localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100)), + localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)), goroutines: 0, active: utils.Int64(0), delayed: utils.Int64(0), @@ -199,6 +199,8 @@ func (c *consumer) Stop(_ context.Context) error { <-c.localPrefetch } + c.localPrefetch = nil + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Pipeline: pipe.Name(), diff --git a/plugins/memory/memoryjobs/item.go b/plugins/memory/memoryjobs/item.go index 8224c26b..f4d62ada 100644 --- a/plugins/memory/memoryjobs/item.go +++ b/plugins/memory/memoryjobs/item.go @@ -124,6 +124,7 @@ func fromJob(job *job.Job) *Item { Job: job.Job, Ident: job.Ident, Payload: job.Payload, + Headers: job.Headers, Options: &Options{ Priority: job.Options.Priority, Pipeline: job.Options.Pipeline, |