diff options
author | Valery Piashchynski <[email protected]> | 2021-07-27 12:39:01 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-27 12:39:01 +0300 |
commit | 1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (patch) | |
tree | 68c7c7e8d9f4d99debc4895ab8469e323c60f47b /plugins/jobs/drivers/beanstalk | |
parent | d72181126867c7e8fc05e5ac927bd90d01e0dbc7 (diff) |
Initial support for the cancellation via context
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 25 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 23 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 10 |
3 files changed, 31 insertions, 27 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 6cc50c07..797b4821 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -1,6 +1,7 @@ package beanstalk import ( + "context" "net" "sync" "time" @@ -54,14 +55,14 @@ func NewConnPool(network, address, tName string, tout time.Duration, log logger. }, nil } -func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { +func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { cp.RLock() defer cp.RUnlock() id, err := cp.t.Put(body, pri, delay, ttr) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(err) + errN := cp.checkAndRedial(ctx, err) if errN != nil { return 0, errN } else { @@ -80,14 +81,14 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint // Typically, a client will reserve a job, perform some work, then delete // the job with Conn.Delete. -func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { +func (cp *ConnPool) Reserve(ctx context.Context, reserveTimeout time.Duration) (uint64, []byte, error) { cp.RLock() defer cp.RUnlock() id, body, err := cp.ts.Reserve(reserveTimeout) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(err) + errN := cp.checkAndRedial(ctx, err) if errN != nil { return 0, nil, errN } else { @@ -99,14 +100,14 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error return id, body, nil } -func (cp *ConnPool) Delete(id uint64) error { +func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { cp.RLock() defer cp.RUnlock() err := cp.conn.Delete(id) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(err) + errN := cp.checkAndRedial(ctx, err) if errN != nil { return errN } else { @@ -117,14 +118,12 @@ func (cp *ConnPool) Delete(id uint64) error { return nil } -func (cp *ConnPool) redial() error { +func (cp *ConnPool) redial(ctx context.Context) error { const op = errors.Op("connection_pool_redial") cp.Lock() // backoff here - expb := backoff.NewExponentialBackOff() - // set the retry timeout (minutes) - expb.MaxElapsedTime = time.Minute * 5 + expb := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) operation := func() error { connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) @@ -165,7 +164,7 @@ func (cp *ConnPool) redial() error { var connErrors = map[string]struct{}{"EOF": {}} -func (cp *ConnPool) checkAndRedial(err error) error { +func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error { const op = errors.Op("connection_pool_check_redial") switch et := err.(type) { //nolint:gocritic // check if the error @@ -173,7 +172,7 @@ func (cp *ConnPool) checkAndRedial(err error) error { switch bErr := et.Err.(type) { case *net.OpError: cp.RUnlock() - errR := cp.redial() + errR := cp.redial(ctx) cp.RLock() // if redial failed - return if errR != nil { @@ -186,7 +185,7 @@ func (cp *ConnPool) checkAndRedial(err error) error { if _, ok := connErrors[et.Err.Error()]; ok { // if error is related to the broken connection - redial cp.RUnlock() - errR := cp.redial() + errR := cp.redial(ctx) cp.RLock() // if redial failed - return if errR != nil { diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index dec54426..90da3801 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -2,6 +2,7 @@ package beanstalk import ( "bytes" + "context" "strings" "sync/atomic" "time" @@ -139,7 +140,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return jc, nil } -func (j *JobConsumer) Push(jb *job.Job) error { +func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("beanstalk_push") // check if the pipeline registered @@ -173,9 +174,9 @@ func (j *JobConsumer) Push(jb *job.Job) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.pool.Put(bb.Bytes(), j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration()) + id, err := j.pool.Put(ctx, bb.Bytes(), j.tubePriority, item.Options.DelayDuration(), item.Options.TimeoutDuration()) if err != nil { - errD := j.pool.Delete(id) + errD := j.pool.Delete(ctx, id) if errD != nil { return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error())) } @@ -185,13 +186,13 @@ func (j *JobConsumer) Push(jb *job.Job) error { return nil } -func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { +func (j *JobConsumer) Register(ctx context.Context, p *pipeline.Pipeline) error { // register the pipeline - j.pipeline.Store(pipeline) + j.pipeline.Store(p) return nil } -func (j *JobConsumer) Run(p *pipeline.Pipeline) error { +func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { const op = errors.Op("beanstalk_run") // check if the pipeline registered @@ -203,7 +204,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { atomic.AddUint32(&j.listeners, 1) - go j.listen() + go j.listen(ctx) j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, @@ -215,7 +216,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop() error { +func (j *JobConsumer) Stop(context.Context) error { pipe := j.pipeline.Load().(*pipeline.Pipeline) if atomic.LoadUint32(&j.listeners) == 1 { @@ -232,7 +233,7 @@ func (j *JobConsumer) Stop() error { return nil } -func (j *JobConsumer) Pause(p string) { +func (j *JobConsumer) Pause(ctx context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -259,7 +260,7 @@ func (j *JobConsumer) Pause(p string) { }) } -func (j *JobConsumer) Resume(p string) { +func (j *JobConsumer) Resume(ctx context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { @@ -275,7 +276,7 @@ func (j *JobConsumer) Resume(p string) { } // start listener - go j.listen() + go j.listen(ctx) // increase num of listeners atomic.AddUint32(&j.listeners, 1) diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index 3e9061a3..b872cbd4 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -1,15 +1,19 @@ package beanstalk -import "github.com/beanstalkd/go-beanstalk" +import ( + "context" -func (j *JobConsumer) listen() { + "github.com/beanstalkd/go-beanstalk" +) + +func (j *JobConsumer) listen(ctx context.Context) { for { select { case <-j.stopCh: j.log.Warn("beanstalk listener stopped") return default: - id, body, err := j.pool.Reserve(j.reserveTimeout) + id, body, err := j.pool.Reserve(ctx, j.reserveTimeout) if err != nil { if errB, ok := err.(beanstalk.ConnError); ok { switch errB.Err { //nolint:gocritic |