summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/ephemeral/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/brokers/ephemeral/consumer.go')
-rw-r--r--plugins/jobs/brokers/ephemeral/consumer.go42
1 files changed, 33 insertions, 9 deletions
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go
index 8f6f4b5f..9d79221c 100644
--- a/plugins/jobs/brokers/ephemeral/consumer.go
+++ b/plugins/jobs/brokers/ephemeral/consumer.go
@@ -22,14 +22,17 @@ type JobBroker struct {
queues sync.Map
pq priorityqueue.Queue
localQueue chan *Item
+
+ stopCh chan struct{}
}
func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) {
const op = errors.Op("new_ephemeral_pipeline")
jb := &JobBroker{
- log: log,
- pq: q,
+ log: log,
+ pq: q,
+ stopCh: make(chan struct{}, 1),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -50,6 +53,10 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q
return jb, nil
}
+func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobBroker, error) {
+ panic("not implemented")
+}
+
func (j *JobBroker) Push(job *structs.Job) error {
const op = errors.Op("ephemeral_push")
@@ -82,8 +89,13 @@ func (j *JobBroker) Push(job *structs.Job) error {
func (j *JobBroker) consume() {
// redirect
- for item := range j.localQueue {
- j.pq.Insert(item)
+ for {
+ select {
+ case item := <-j.localQueue:
+ j.pq.Insert(item)
+ case <-j.stopCh:
+ return
+ }
}
}
@@ -98,11 +110,6 @@ func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error {
return nil
}
-// Consume is no-op for the ephemeral
-func (j *JobBroker) Consume(_ *pipeline.Pipeline) error {
- return nil
-}
-
func (j *JobBroker) Pause(pipeline string) {
if q, ok := j.queues.Load(pipeline); ok {
if q == true {
@@ -132,3 +139,20 @@ func (j *JobBroker) List() []string {
return out
}
+
+// Run is no-op for the ephemeral
+func (j *JobBroker) Run(_ *pipeline.Pipeline) error {
+ return nil
+}
+
+func (j *JobBroker) Stop() error {
+ j.queues.Range(func(key, _ interface{}) bool {
+ j.queues.Delete(key)
+ return true
+ })
+
+ // return from the consumer
+ j.stopCh <- struct{}{}
+
+ return nil
+}