diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 23 |
1 files changed, 12 insertions, 11 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index dec54426..90da3801 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -2,6 +2,7 @@ package beanstalk import ( "bytes" + "context" "strings" "sync/atomic" "time" @@ -139,7 +140,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return jc, nil } -func (j *JobConsumer) Push(jb *job.Job) error { +func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("beanstalk_push") // check if the pipeline registered @@ -173,9 +174,9 @@ func (j *JobConsumer) Push(jb *job.Job) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(bb.Bytes(), j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration()) + id, err := j.pool.Put(ctx, bb.Bytes(), j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration()) if err != nil { - errD := j.pool.Delete(id) + errD := j.pool.Delete(ctx, id) if errD != nil { return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error())) } @@ -185,13 +186,13 @@ func (j *JobConsumer) Push(jb *job.Job) error { return nil } -func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { +func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error { // register the pipeline - j.pipeline.Store(pipeline) + j.pipeline.Store(p) return nil } -func (j *JobConsumer) Run(p *pipeline.Pipeline) error { +func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") // check if the pipeline registered @@ -203,7 +204,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { atomic.AddUint32(&j.listeners, 1) - go j.listen() + go j.listen(ctx) j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, @@ -215,7 +216,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop() error { +func (j *JobConsumer) Stop(context.Context) error { pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -232,7 +233,7 @@ func (j *JobConsumer) Stop() error { return nil } -func (j *JobConsumer) Pause(p string) { +func (j *JobConsumer) Pause(ctx context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -259,7 +260,7 @@ func (j *JobConsumer) Pause(p string) { }) } -func (j *JobConsumer) Resume(p string) { +func (j *JobConsumer) Resume(ctx context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -275,7 +276,7 @@ func (j *JobConsumer) Resume(p string) { } // start listener - go j.listen() + go j.listen(ctx) // increase num of listeners atomic.AddUint32(&j.listeners, 1) |