diff options
-rw-r--r-- | pkg/priorityqueue/binary_heap.go | 6 | ||||
-rw-r--r-- | pkg/priorityqueue/interface.go | 1 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 15 | ||||
-rw-r--r-- | tests/plugins/jobs/configs/.rr-jobs-init.yaml | 6 |
4 files changed, 24 insertions, 4 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go index 47fdf5e5..a8d80fc0 100644 --- a/pkg/priorityqueue/binary_heap.go +++ b/pkg/priorityqueue/binary_heap.go @@ -68,6 +68,10 @@ func (bh *BinHeap) fixDown(curr, end int) { } } +func (bh *BinHeap) Len() uint64 { + return atomic.LoadUint64(&bh.len) +} + func (bh *BinHeap) Insert(item Item) { bh.cond.L.Lock() bh.items = append(bh.items, item) @@ -87,7 +91,7 @@ func (bh *BinHeap) GetMax() Item { bh.cond.L.Lock() defer bh.cond.L.Unlock() - if atomic.LoadUint64(&bh.len) == 0 { + for atomic.LoadUint64(&bh.len) == 0 { bh.cond.Wait() } diff --git a/pkg/priorityqueue/interface.go b/pkg/priorityqueue/interface.go index 100aa667..7c053e6d 100644 --- a/pkg/priorityqueue/interface.go +++ b/pkg/priorityqueue/interface.go @@ -3,6 +3,7 @@ package priorityqueue type Queue interface { Insert(item Item) GetMax() Item + Len() uint64 } // Item represents binary heap item diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index d603dce6..df34856e 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -46,6 +46,9 @@ type Plugin struct { // parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline pipelines sync.Map + + // initial set of the pipelines to consume + consume map[string]struct{} } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -65,12 +68,19 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.events = events.NewEventsHandler() p.jobConstructors = make(map[string]jobs.Constructor) p.consumers = make(map[string]jobs.Consumer) + p.consume = make(map[string]struct{}) // initial set of pipelines for i := range p.cfg.Pipelines { p.pipelines.Store(i, p.cfg.Pipelines[i]) } + if len(p.cfg.Consume) > 0 { + for i := 0; i < len(p.cfg.Consume); i++ { + p.consume[p.cfg.Consume[i]] = struct{}{} + } + } + // initialize priority queue p.queue = priorityqueue.NewBinHeap() p.log = log @@ -87,6 +97,11 @@ func (p *Plugin) Serve() chan error { // pipeline name (ie test-local, sqs-aws, etc) name := key.(string) + // skip pipelines which are not initialized to consume + if _, ok := p.consume[name]; !ok { + return true + } + // pipeline associated with the name pipe := value.(*pipeline.Pipeline) // driver for the pipeline (ie amqp, ephemeral, etc) diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml index c81ba6ef..bb6c477a 100644 --- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml +++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml @@ -1,5 +1,5 @@ rpc: - listen: unix:///home/valery/Downloads/rr.sock + listen: unix:///tmp/rr.sock server: command: "php ../../client.php echo pipes" @@ -22,7 +22,7 @@ sqs: jobs: - num_pollers: 32 + num_pollers: 64 # worker pool configuration pool: num_workers: 10 @@ -54,5 +54,5 @@ jobs: MessageRetentionPeriod: 86400 # list of pipelines to be consumed by the server, keep empty if you want to start consuming manually - consume: [ "test-local", "test-1" ] + consume: [ "test-local" ] |