summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-07 21:37:37 +0300
committerValery Piashchynski <[email protected]>2021-07-07 21:37:37 +0300
commitb84a7cb26c184b709f18d3d52925b31d49351c03 (patch)
tree5be8fafccbb2e7ad2dc6496e8a5f1d212a65a8bb /plugins
parent60c229c8506df465586434309af5acd1f84e2406 (diff)
New Methods in the binary heap interface...
Add Len() method to the Binary Heaps interface with implementation. Start consumers only for the user-defined set from the config. Add Headers field to the proto Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/plugin.go15
1 files changed, 15 insertions, 0 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index d603dce6..df34856e 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -46,6 +46,9 @@ type Plugin struct {
// parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline
pipelines sync.Map
+
+ // initial set of the pipelines to consume
+ consume map[string]struct{}
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
@@ -65,12 +68,19 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.events = events.NewEventsHandler()
p.jobConstructors = make(map[string]jobs.Constructor)
p.consumers = make(map[string]jobs.Consumer)
+ p.consume = make(map[string]struct{})
// initial set of pipelines
for i := range p.cfg.Pipelines {
p.pipelines.Store(i, p.cfg.Pipelines[i])
}
+ if len(p.cfg.Consume) > 0 {
+ for i := 0; i < len(p.cfg.Consume); i++ {
+ p.consume[p.cfg.Consume[i]] = struct{}{}
+ }
+ }
+
// initialize priority queue
p.queue = priorityqueue.NewBinHeap()
p.log = log
@@ -87,6 +97,11 @@ func (p *Plugin) Serve() chan error {
// pipeline name (ie test-local, sqs-aws, etc)
name := key.(string)
+ // skip pipelines which are not initialized to consume
+ if _, ok := p.consume[name]; !ok {
+ return true
+ }
+
// pipeline associated with the name
pipe := value.(*pipeline.Pipeline)
// driver for the pipeline (ie amqp, ephemeral, etc)