diff options
author | Valery Piashchynski <[email protected]> | 2021-09-02 17:52:18 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-02 17:52:18 +0300 |
commit | c64005501f92888c10a61481745df91c7c50639f (patch) | |
tree | e0e76c148ce6016216082e610dce46b4608f472b /plugins | |
parent | 74c327a86e48ccc9d58833fce994ea134169d0a9 (diff) |
Destroy localPrefetch channel on stop
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/rpc.go | 2 | ||||
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 6 | ||||
-rw-r--r-- | plugins/memory/memoryjobs/item.go | 1 |
3 files changed, 6 insertions, 3 deletions
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 94f903d5..62186981 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -138,7 +138,7 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { // from converts from transport entity to domain func (r *rpc) from(j *jobsv1beta.Job) *job.Job { - headers := map[string][]string{} + headers := make(map[string][]string, len(j.GetHeaders())) for k, v := range j.GetHeaders() { headers[k] = v.GetValue() diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index c2cc303b..fbdedefe 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -62,7 +62,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh } if jb.cfg.Prefetch == 0 { - jb.cfg.Prefetch = 100 + jb.cfg.Prefetch = 100_000 } // initialize a local queue @@ -76,7 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand log: log, pq: pq, eh: eh, - localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100)), + localPrefetch: make(chan *Item, pipeline.Int(prefetch, 100_000)), goroutines: 0, active: utils.Int64(0), delayed: utils.Int64(0), @@ -199,6 +199,8 @@ func (c *consumer) Stop(_ context.Context) error { <-c.localPrefetch } + c.localPrefetch = nil + c.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, Pipeline: pipe.Name(), diff --git a/plugins/memory/memoryjobs/item.go b/plugins/memory/memoryjobs/item.go index 8224c26b..f4d62ada 100644 --- a/plugins/memory/memoryjobs/item.go +++ b/plugins/memory/memoryjobs/item.go @@ -124,6 +124,7 @@ func fromJob(job *job.Job) *Item { Job: job.Job, Ident: job.Ident, Payload: job.Payload, + Headers: job.Headers, Options: &Options{ Priority: job.Options.Priority, Pipeline: job.Options.Pipeline, |