summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-02 17:52:18 +0300
committerValery Piashchynski <[email protected]>2021-09-02 17:52:18 +0300
commitc64005501f92888c10a61481745df91c7c50639f (patch)
treee0e76c148ce6016216082e610dce46b4608f472b /plugins
parent74c327a86e48ccc9d58833fce994ea134169d0a9 (diff)
Destroy localPrefetch channel on stop
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/rpc.go2
-rw-r--r--plugins/memory/memoryjobs/consumer.go6
-rw-r--r--plugins/memory/memoryjobs/item.go1
3 files changed, 6 insertions, 3 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index 94f903d5..62186981 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -138,7 +138,7 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error {
// from converts from transport entity to domain
func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
- headers := map[string][]string{}
+ headers := make(map[string][]string, len(j.GetHeaders()))
for k, v := range j.GetHeaders() {
headers[k] = v.GetValue()
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go
index c2cc303b..fbdedefe 100644
--- a/plugins/memory/memoryjobs/consumer.go
+++ b/plugins/memory/memoryjobs/consumer.go
@@ -62,7 +62,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
}
if jb.cfg.Prefetch == 0 {
- jb.cfg.Prefetch = 100
+ jb.cfg.Prefetch = 100_000
}
// initialize a local queue
@@ -76,7 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
log: log,
pq: pq,
eh: eh,
- localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100)),
+ localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)),
goroutines: 0,
active: utils.Int64(0),
delayed: utils.Int64(0),
@@ -199,6 +199,8 @@ func (c *consumer) Stop(_ context.Context) error {
<-c.localPrefetch
}
+ c.localPrefetch = nil
+
c.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,
Pipeline: pipe.Name(),
diff --git a/plugins/memory/memoryjobs/item.go b/plugins/memory/memoryjobs/item.go
index 8224c26b..f4d62ada 100644
--- a/plugins/memory/memoryjobs/item.go
+++ b/plugins/memory/memoryjobs/item.go
@@ -124,6 +124,7 @@ func fromJob(job *job.Job) *Item {
Job: job.Job,
Ident: job.Ident,
Payload: job.Payload,
+ Headers: job.Headers,
Options: &Options{
Priority: job.Options.Priority,
Pipeline: job.Options.Pipeline,