summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/ephemeral
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
committerValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
commit3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch)
tree8a8426eb09b2a03cfad35f432c6985c3e13fb853 /plugins/jobs/drivers/ephemeral
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/ephemeral')
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go28
1 files changed, 14 insertions, 14 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index f0992cd6..91b8eda9 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -25,7 +25,7 @@ type Config struct {
Prefetch uint64 `mapstructure:"prefetch"`
}
-type JobConsumer struct {
+type consumer struct {
cfg *Config
log logger.Logger
eh events.Handler
@@ -43,10 +43,10 @@ type JobConsumer struct {
stopCh chan struct{}
}
-func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
+func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
const op = errors.Op("new_ephemeral_pipeline")
- jb := &JobConsumer{
+ jb := &consumer{
log: log,
pq: pq,
eh: eh,
@@ -71,8 +71,8 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
return jb, nil
}
-func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
- jb := &JobConsumer{
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*consumer, error) {
+ jb := &consumer{
log: log,
pq: pq,
eh: eh,
@@ -88,7 +88,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
return jb, nil
}
-func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
+func (j *consumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
@@ -105,7 +105,7 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return nil
}
-func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
+func (j *consumer) State(_ context.Context) (*jobState.State, error) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
return &jobState.State{
Pipeline: pipe.Name(),
@@ -117,12 +117,12 @@ func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) {
}, nil
}
-func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
+func (j *consumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
j.pipeline.Store(pipeline)
return nil
}
-func (j *JobConsumer) Pause(_ context.Context, p string) {
+func (j *consumer) Pause(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested pause on: ", p)
@@ -149,7 +149,7 @@ func (j *JobConsumer) Pause(_ context.Context, p string) {
})
}
-func (j *JobConsumer) Resume(_ context.Context, p string) {
+func (j *consumer) Resume(_ context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested resume on: ", p)
@@ -175,7 +175,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
}
// Run is no-op for the ephemeral
-func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
+func (j *consumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
@@ -185,7 +185,7 @@ func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(ctx context.Context) error {
+func (j *consumer) Stop(ctx context.Context) error {
const op = errors.Op("ephemeral_plugin_stop")
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -207,7 +207,7 @@ func (j *JobConsumer) Stop(ctx context.Context) error {
}
}
-func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+func (j *consumer) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("ephemeral_handle_request")
// handle timeouts
// theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
@@ -245,7 +245,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
}
}
-func (j *JobConsumer) consume() {
+func (j *consumer) consume() {
go func() {
// redirect
for {