diff options
author | Valery Piashchynski <[email protected]> | 2021-07-07 21:37:37 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-07 21:37:37 +0300 |
commit | b84a7cb26c184b709f18d3d52925b31d49351c03 (patch) | |
tree | 5be8fafccbb2e7ad2dc6496e8a5f1d212a65a8bb /plugins | |
parent | 60c229c8506df465586434309af5acd1f84e2406 (diff) |
New Methods in the binary heap interface...
Add Len() method to the Binary Heaps interface with implementation.
Start consumers only for the user-defined set from the config.
Add Headers field to the proto
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/plugin.go | 15 |
1 files changed, 15 insertions, 0 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index d603dce6..df34856e 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -46,6 +46,9 @@ type Plugin struct { // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline pipelines sync.Map + + // initial set of the pipelines to consume + consume map[string]struct{} } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -65,12 +68,19 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.events = events.NewEventsHandler() p.jobConstructors = make(map[string]jobs.Constructor) p.consumers = make(map[string]jobs.Consumer) + p.consume = make(map[string]struct{}) // initial set of pipelines for i := range p.cfg.Pipelines { p.pipelines.Store(i, p.cfg.Pipelines[i]) } + if len(p.cfg.Consume) > 0 { + for i := 0; i < len(p.cfg.Consume); i++ { + p.consume[p.cfg.Consume[i]] = struct{}{} + } + } + // initialize priority queue p.queue = priorityqueue.NewBinHeap() p.log = log @@ -87,6 +97,11 @@ func (p *Plugin) Serve() chan error { // pipeline name (ie test-local, sqs-aws, etc) name := key.(string) + // skip pipelines which are not initialized to consume + if _, ok := p.consume[name]; !ok { + return true + } + // pipeline associated with the name pipe := value.(*pipeline.Pipeline) // driver for the pipeline (ie amqp, ephemeral, etc) |