diff options
Diffstat (limited to 'plugins/jobs/drivers/ephemeral/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index f0992cd6..91b8eda9 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -25,7 +25,7 @@ type Config struct { Prefetch uint64 `mapstructure:"prefetch"` } -type JobConsumer struct { +type consumer struct { cfg *Config log logger.Logger eh events.Handler @@ -43,10 +43,10 @@ type JobConsumer struct { stopCh chan struct{} } -func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { +func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { const op = errors.Op("new_ephemeral_pipeline") - jb := &JobConsumer{ + jb := &consumer{ log: log, pq: pq, eh: eh, @@ -71,8 +71,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh return jb, nil } -func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { - jb := &JobConsumer{ +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) { + jb := &consumer{ log: log, pq: pq, eh: eh, @@ -88,7 +88,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand return jb, nil } -func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { +func (j *consumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered @@ -105,7 +105,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return nil } -func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { +func (j *consumer) State(_ context.Context) (*jobState.State, error) { pipe := j.pipeline.Load().(*pipeline.Pipeline) return &jobState.State{ Pipeline: pipe.Name(), @@ -117,12 +117,12 @@ func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { }, nil } -func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { +func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { j.pipeline.Store(pipeline) return nil } -func (j *JobConsumer) Pause(_ context.Context, p string) { +func (j *consumer) Pause(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested pause on: ", p) @@ -149,7 +149,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) { }) } -func (j *JobConsumer) Resume(_ context.Context, p string) { +func (j *consumer) Resume(_ context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested resume on: ", p) @@ -175,7 +175,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { } // Run is no-op for the ephemeral -func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { +func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), @@ -185,7 +185,7 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(ctx context.Context) error { +func (j *consumer) Stop(ctx context.Context) error { const op = errors.Op("ephemeral_plugin_stop") pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -207,7 +207,7 @@ func (j *JobConsumer) Stop(ctx context.Context) error { } } -func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { +func (j *consumer) handleItem(ctx context.Context, msg *Item) error { const op = errors.Op("ephemeral_handle_request") // handle timeouts // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) @@ -245,7 +245,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { } } -func (j *JobConsumer) consume() { +func (j *consumer) consume() { go func() { // redirect for { |