diff options
author | Valery Piashchynski <[email protected]> | 2021-07-27 12:39:01 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-27 12:39:01 +0300 |
commit | 1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (patch) | |
tree | 68c7c7e8d9f4d99debc4895ab8469e323c60f47b /plugins/jobs/drivers/ephemeral/consumer.go | |
parent | d72181126867c7e8fc05e5ac927bd90d01e0dbc7 (diff) |
Initial support for the cancellation via context
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/ephemeral/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 9de64b82..043da118 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -1,6 +1,7 @@ package ephemeral import ( + "context" "sync" "sync/atomic" "time" @@ -82,7 +83,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand return jb, nil } -func (j *JobConsumer) Push(jb *job.Job) error { +func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered @@ -139,7 +140,7 @@ func (j *JobConsumer) consume() { } } -func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { +func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") if _, ok := j.pipeline.Load(pipeline.Name()); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) @@ -150,7 +151,7 @@ func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Pause(pipeline string) { +func (j *JobConsumer) Pause(ctx context.Context, pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == true { // mark pipeline as turned off @@ -166,7 +167,7 @@ func (j *JobConsumer) Pause(pipeline string) { }) } -func (j *JobConsumer) Resume(pipeline string) { +func (j *JobConsumer) Resume(ctx context.Context, pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == false { // mark pipeline as turned on @@ -183,7 +184,7 @@ func (j *JobConsumer) Resume(pipeline string) { } // Run is no-op for the ephemeral -func (j *JobConsumer) Run(pipe *pipeline.Pipeline) error { +func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error { j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), @@ -193,7 +194,7 @@ func (j *JobConsumer) Run(pipe *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop() error { +func (j *JobConsumer) Stop(context.Context) error { var pipe string j.pipeline.Range(func(key, _ interface{}) bool { pipe = key.(string) |