summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-27 12:39:01 +0300
committerValery Piashchynski <[email protected]>2021-07-27 12:39:01 +0300
commit1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (patch)
tree68c7c7e8d9f4d99debc4895ab8469e323c60f47b /plugins/jobs/drivers/sqs/consumer.go
parentd72181126867c7e8fc05e5ac927bd90d01e0dbc7 (diff)
Initial support for the cancellation via context
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/sqs/consumer.go')
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go21
1 files changed, 9 insertions, 12 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 08a6170e..b81d08e5 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -238,7 +238,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
return jb, nil
}
-func (j *JobConsumer) Push(jb *job.Job) error {
+func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("sqs_push")
// check if the pipeline registered
@@ -256,9 +256,6 @@ func (j *JobConsumer) Push(jb *job.Job) error {
msg := fromJob(jb)
- // 10 seconds deadline to make a request TODO ???
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
- defer cancel()
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
_, err := j.client.SendMessage(ctx, msg.pack(j.queueURL))
@@ -269,12 +266,12 @@ func (j *JobConsumer) Push(jb *job.Job) error {
return nil
}
-func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error {
- j.pipeline.Store(pipeline)
+func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ j.pipeline.Store(p)
return nil
}
-func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(_ context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("rabbit_consume")
j.Lock()
@@ -288,7 +285,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
atomic.AddUint32(&j.listeners, 1)
// start listener
- go j.listen()
+ go j.listen(context.Background())
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
@@ -300,7 +297,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop() error {
+func (j *JobConsumer) Stop(context.Context) error {
j.pauseCh <- struct{}{}
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -313,7 +310,7 @@ func (j *JobConsumer) Stop() error {
return nil
}
-func (j *JobConsumer) Pause(p string) {
+func (j *JobConsumer) Pause(ctx context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -341,7 +338,7 @@ func (j *JobConsumer) Pause(p string) {
})
}
-func (j *JobConsumer) Resume(p string) {
+func (j *JobConsumer) Resume(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -357,7 +354,7 @@ func (j *JobConsumer) Resume(p string) {
}
// start listener
- go j.listen()
+ go j.listen(context.Background())
// increase num of listeners
atomic.AddUint32(&j.listeners, 1)