summaryrefslogtreecommitdiff
path: root/plugins/memory/memoryjobs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/memory/memoryjobs')
-rw-r--r--plugins/memory/memoryjobs/consumer.go6
-rw-r--r--plugins/memory/memoryjobs/item.go1
2 files changed, 5 insertions, 2 deletions
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go
index c2cc303b..fbdedefe 100644
--- a/plugins/memory/memoryjobs/consumer.go
+++ b/plugins/memory/memoryjobs/consumer.go
@@ -62,7 +62,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
}
if jb.cfg.Prefetch == 0 {
- jb.cfg.Prefetch = 100
+ jb.cfg.Prefetch = 100_000
}
// initialize a local queue
@@ -76,7 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
log: log,
pq: pq,
eh: eh,
- localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100)),
+ localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)),
goroutines: 0,
active: utils.Int64(0),
delayed: utils.Int64(0),
@@ -199,6 +199,8 @@ func (c *consumer) Stop(_ context.Context) error {
<-c.localPrefetch
}
+ c.localPrefetch = nil
+
c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Pipeline: pipe.Name(),
diff --git a/plugins/memory/memoryjobs/item.go b/plugins/memory/memoryjobs/item.go
index 8224c26b..f4d62ada 100644
--- a/plugins/memory/memoryjobs/item.go
+++ b/plugins/memory/memoryjobs/item.go
@@ -124,6 +124,7 @@ func fromJob(job *job.Job) *Item {
Job: job.Job,
Ident: job.Ident,
Payload: job.Payload,
+ Headers: job.Headers,
Options: &Options{
Priority: job.Options.Priority,
Pipeline: job.Options.Pipeline,