summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/priorityqueue/binary_heap.go6
-rw-r--r--pkg/priorityqueue/interface.go1
-rw-r--r--plugins/jobs/plugin.go15
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml6
4 files changed, 24 insertions, 4 deletions
diff --git a/pkg/priorityqueue/binary_heap.go b/pkg/priorityqueue/binary_heap.go
index 47fdf5e5..a8d80fc0 100644
--- a/pkg/priorityqueue/binary_heap.go
+++ b/pkg/priorityqueue/binary_heap.go
@@ -68,6 +68,10 @@ func (bh *BinHeap) fixDown(curr, end int) {
}
}
+func (bh *BinHeap) Len() uint64 {
+ return atomic.LoadUint64(&bh.len)
+}
+
func (bh *BinHeap) Insert(item Item) {
bh.cond.L.Lock()
bh.items = append(bh.items, item)
@@ -87,7 +91,7 @@ func (bh *BinHeap) GetMax() Item {
bh.cond.L.Lock()
defer bh.cond.L.Unlock()
- if atomic.LoadUint64(&bh.len) == 0 {
+ for atomic.LoadUint64(&bh.len) == 0 {
bh.cond.Wait()
}
diff --git a/pkg/priorityqueue/interface.go b/pkg/priorityqueue/interface.go
index 100aa667..7c053e6d 100644
--- a/pkg/priorityqueue/interface.go
+++ b/pkg/priorityqueue/interface.go
@@ -3,6 +3,7 @@ package priorityqueue
type Queue interface {
Insert(item Item)
GetMax() Item
+ Len() uint64
}
// Item represents binary heap item
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index d603dce6..df34856e 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -46,6 +46,9 @@ type Plugin struct {
// parent config for broken options. keys are pipelines names, values - pointers to the associated pipeline
pipelines sync.Map
+
+ // initial set of the pipelines to consume
+ consume map[string]struct{}
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
@@ -65,12 +68,19 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.events = events.NewEventsHandler()
p.jobConstructors = make(map[string]jobs.Constructor)
p.consumers = make(map[string]jobs.Consumer)
+ p.consume = make(map[string]struct{})
// initial set of pipelines
for i := range p.cfg.Pipelines {
p.pipelines.Store(i, p.cfg.Pipelines[i])
}
+ if len(p.cfg.Consume) > 0 {
+ for i := 0; i < len(p.cfg.Consume); i++ {
+ p.consume[p.cfg.Consume[i]] = struct{}{}
+ }
+ }
+
// initialize priority queue
p.queue = priorityqueue.NewBinHeap()
p.log = log
@@ -87,6 +97,11 @@ func (p *Plugin) Serve() chan error {
// pipeline name (ie test-local, sqs-aws, etc)
name := key.(string)
+ // skip pipelines which are not initialized to consume
+ if _, ok := p.consume[name]; !ok {
+ return true
+ }
+
// pipeline associated with the name
pipe := value.(*pipeline.Pipeline)
// driver for the pipeline (ie amqp, ephemeral, etc)
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index c81ba6ef..bb6c477a 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -1,5 +1,5 @@
rpc:
- listen: unix:///home/valery/Downloads/rr.sock
+ listen: unix:///tmp/rr.sock
server:
command: "php ../../client.php echo pipes"
@@ -22,7 +22,7 @@ sqs:
jobs:
- num_pollers: 32
+ num_pollers: 64
# worker pool configuration
pool:
num_workers: 10
@@ -54,5 +54,5 @@ jobs:
MessageRetentionPeriod: 86400
# list of pipelines to be consumed by the server, keep empty if you want to start consuming manually
- consume: [ "test-local", "test-1" ]
+ consume: [ "test-local" ]