diff options
Diffstat (limited to 'plugins/jobs/brokers/ephemeral/consumer.go')
-rw-r--r-- | plugins/jobs/brokers/ephemeral/consumer.go | 42 |
1 files changed, 33 insertions, 9 deletions
diff --git a/plugins/jobs/brokers/ephemeral/consumer.go b/plugins/jobs/brokers/ephemeral/consumer.go index 8f6f4b5f..9d79221c 100644 --- a/plugins/jobs/brokers/ephemeral/consumer.go +++ b/plugins/jobs/brokers/ephemeral/consumer.go @@ -22,14 +22,17 @@ type JobBroker struct { queues sync.Map pq priorityqueue.Queue localQueue chan *Item + + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q priorityqueue.Queue) (*JobBroker, error) { const op = errors.Op("new_ephemeral_pipeline") jb := &JobBroker{ - log: log, - pq: q, + log: log, + pq: q, + stopCh: make(chan struct{}, 1), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -50,6 +53,10 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, q return jb, nil } +func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobBroker, error) { + panic("not implemented") +} + func (j *JobBroker) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") @@ -82,8 +89,13 @@ func (j *JobBroker) Push(job *structs.Job) error { func (j *JobBroker) consume() { // redirect - for item := range j.localQueue { - j.pq.Insert(item) + for { + select { + case item := <-j.localQueue: + j.pq.Insert(item) + case <-j.stopCh: + return + } } } @@ -98,11 +110,6 @@ func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { return nil } -// Consume is no-op for the ephemeral -func (j *JobBroker) Consume(_ *pipeline.Pipeline) error { - return nil -} - func (j *JobBroker) Pause(pipeline string) { if q, ok := j.queues.Load(pipeline); ok { if q == true { @@ -132,3 +139,20 @@ func (j *JobBroker) List() []string { return out } + +// Run is no-op for the ephemeral +func (j *JobBroker) Run(_ *pipeline.Pipeline) error { + return nil +} + +func (j *JobBroker) Stop() error { + j.queues.Range(func(key, _ interface{}) bool { + j.queues.Delete(key) + return true + }) + + // return from the consumer + j.stopCh <- struct{}{} + + return nil +} |