diff options
author | Valery Piashchynski <[email protected]> | 2021-07-27 12:39:01 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-27 12:39:01 +0300 |
commit | 1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (patch) | |
tree | 68c7c7e8d9f4d99debc4895ab8469e323c60f47b /plugins/jobs/drivers/sqs/consumer.go | |
parent | d72181126867c7e8fc05e5ac927bd90d01e0dbc7 (diff) |
Initial support for the cancellation via context
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/sqs/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 21 |
1 files changed, 9 insertions, 12 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 08a6170e..b81d08e5 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -238,7 +238,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf return jb, nil } -func (j *JobConsumer) Push(jb *job.Job) error { +func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("sqs_push") // check if the pipeline registered @@ -256,9 +256,6 @@ func (j *JobConsumer) Push(jb *job.Job) error { msg := fromJob(jb) - // 10 seconds deadline to make a request TODO ??? - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10)) - defer cancel() // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. _, err := j.client.SendMessage(ctx, msg.pack(j.queueURL)) @@ -269,12 +266,12 @@ func (j *JobConsumer) Push(jb *job.Job) error { return nil } -func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { - j.pipeline.Store(pipeline) +func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { + j.pipeline.Store(p) return nil } -func (j *JobConsumer) Run(p *pipeline.Pipeline) error { +func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") j.Lock() @@ -288,7 +285,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { atomic.AddUint32(&j.listeners, 1) // start listener - go j.listen() + go j.listen(context.Background()) j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, @@ -300,7 +297,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop() error { +func (j *JobConsumer) Stop(context.Context) error { j.pauseCh <- struct{}{} pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -313,7 +310,7 @@ func (j *JobConsumer) Stop() error { return nil } -func (j *JobConsumer) Pause(p string) { +func (j *JobConsumer) Pause(ctx context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -341,7 +338,7 @@ func (j *JobConsumer) Pause(p string) { }) } -func (j *JobConsumer) Resume(p string) { +func (j *JobConsumer) Resume(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -357,7 +354,7 @@ func (j *JobConsumer) Resume(p string) { } // start listener - go j.listen() + go j.listen(context.Background()) // increase num of listeners atomic.AddUint32(&j.listeners, 1) |