summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go23
1 files changed, 12 insertions, 11 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index dec54426..90da3801 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -2,6 +2,7 @@ package beanstalk
import (
"bytes"
+ "context"
"strings"
"sync/atomic"
"time"
@@ -139,7 +140,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
return jc, nil
}
-func (j *JobConsumer) Push(jb *job.Job) error {
+func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("beanstalk_push")
// check if the pipeline registered
@@ -173,9 +174,9 @@ func (j *JobConsumer) Push(jb *job.Job) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.pool.Put(bb.Bytes(), j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration())
+ id, err := j.pool.Put(ctx, bb.Bytes(), j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration())
if err != nil {
- errD := j.pool.Delete(id)
+ errD := j.pool.Delete(ctx, id)
if errD != nil {
return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error()))
}
@@ -185,13 +186,13 @@ func (j *JobConsumer) Push(jb *job.Job) error {
return nil
}
-func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error {
+func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error {
// register the pipeline
- j.pipeline.Store(pipeline)
+ j.pipeline.Store(p)
return nil
}
-func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("beanstalk_run")
// check if the pipeline registered
@@ -203,7 +204,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
atomic.AddUint32(&j.listeners, 1)
- go j.listen()
+ go j.listen(ctx)
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
@@ -215,7 +216,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop() error {
+func (j *JobConsumer) Stop(context.Context) error {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if atomic.LoadUint32(&j.listeners) == 1 {
@@ -232,7 +233,7 @@ func (j *JobConsumer) Stop() error {
return nil
}
-func (j *JobConsumer) Pause(p string) {
+func (j *JobConsumer) Pause(ctx context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -259,7 +260,7 @@ func (j *JobConsumer) Pause(p string) {
})
}
-func (j *JobConsumer) Resume(p string) {
+func (j *JobConsumer) Resume(ctx context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
@@ -275,7 +276,7 @@ func (j *JobConsumer) Resume(p string) {
}
// start listener
- go j.listen()
+ go j.listen(ctx)
// increase num of listeners
atomic.AddUint32(&j.listeners, 1)