diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 15 |
1 files changed, 15 insertions, 0 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index d603dce6..df34856e 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -46,6 +46,9 @@ type Plugin struct { // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline pipelines sync.Map + + // initial set of the pipelines to consume + consume map[string]struct{} } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -65,12 +68,19 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.events = events.NewEventsHandler() p.jobConstructors = make(map[string]jobs.Constructor) p.consumers = make(map[string]jobs.Consumer) + p.consume = make(map[string]struct{}) // initial set of pipelines for i := range p.cfg.Pipelines { p.pipelines.Store(i, p.cfg.Pipelines[i]) } + if len(p.cfg.Consume) > 0 { + for i := 0; i < len(p.cfg.Consume); i++ { + p.consume[p.cfg.Consume[i]] = struct{}{} + } + } + // initialize priority queue p.queue = priorityqueue.NewBinHeap() p.log = log @@ -87,6 +97,11 @@ func (p *Plugin) Serve() chan error { // pipeline name (ie test-local, sqs-aws, etc) name := key.(string) + // skip pipelines which are not initialized to consume + if _, ok := p.consume[name]; !ok { + return true + } + // pipeline associated with the name pipe := value.(*pipeline.Pipeline) // driver for the pipeline (ie amqp, ephemeral, etc) |