diff options
Diffstat (limited to 'plugins/jobs/drivers/ephemeral')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index ff8f7860..9de64b82 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -22,7 +22,7 @@ type Config struct { Prefetch uint64 `mapstructure:"prefetch"` } -type JobBroker struct { +type JobConsumer struct { cfg *Config log logger.Logger eh events.Handler @@ -35,10 +35,10 @@ type JobBroker struct { stopCh chan struct{} } -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { const op = errors.Op("new_ephemeral_pipeline") - jb := &JobBroker{ + jb := &JobConsumer{ log: log, pq: pq, eh: eh, @@ -64,8 +64,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { - jb := &JobBroker{ +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { + jb := &JobConsumer{ log: log, pq: pq, eh: eh, @@ -82,7 +82,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand return jb, nil } -func (j *JobBroker) Push(jb *job.Job) error { +func (j *JobConsumer) Push(jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered @@ -127,7 +127,7 @@ func (j *JobBroker) Push(jb *job.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) } -func (j *JobBroker) consume() { +func (j *JobConsumer) consume() { // redirect for { select { @@ -139,7 +139,7 @@ func (j *JobBroker) consume() { } } -func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { +func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") if _, ok := j.pipeline.Load(pipeline.Name()); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) @@ -150,7 +150,7 @@ func (j *JobBroker) Register(pipeline *pipeline.Pipeline) error { return nil } -func (j *JobBroker) Pause(pipeline string) { +func (j *JobConsumer) Pause(pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == true { // mark pipeline as turned off @@ -166,7 +166,7 @@ func (j *JobBroker) Pause(pipeline string) { }) } -func (j *JobBroker) Resume(pipeline string) { +func (j *JobConsumer) Resume(pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == false { // mark pipeline as turned on @@ -183,7 +183,7 @@ func (j *JobBroker) Resume(pipeline string) { } // Run is no-op for the ephemeral -func (j *JobBroker) Run(pipe *pipeline.Pipeline) error { +func (j *JobConsumer) Run(pipe *pipeline.Pipeline) error { j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), @@ -193,7 +193,7 @@ func (j *JobBroker) Run(pipe *pipeline.Pipeline) error { return nil } -func (j *JobBroker) Stop() error { +func (j *JobConsumer) Stop() error { var pipe string j.pipeline.Range(func(key, _ interface{}) bool { pipe = key.(string) |