diff options
author | Valery Piashchynski <[email protected]> | 2021-07-22 13:53:19 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-22 13:53:19 +0300 |
commit | 05660fcd256963eac94ada90f7baa409344f9e73 (patch) | |
tree | 72fe19d7c6b05eda1c5e5cc85cb536878bd8aa24 /plugins/jobs/drivers/beanstalk | |
parent | 182199a6449677a620813e3a8157cd0406095435 (diff) |
Update consumers, tests stabilization
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 44 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 9 |
4 files changed, 46 insertions, 13 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 62301bed..fc659902 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -1,7 +1,7 @@ package beanstalk import ( - "strings" + "net" "sync" "time" @@ -64,6 +64,9 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint errN := cp.checkAndRedial(err) if errN != nil { return 0, errN + } else { + // retry put only when we redialed + return cp.t.Put(body, pri, delay, ttr) } } @@ -83,12 +86,14 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error id, body, err := cp.ts.Reserve(reserveTimeout) if err != nil { + // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { return 0, nil, errN + } else { + // retry Reserve only when we redialed + return cp.ts.Reserve(reserveTimeout) } - - return 0, nil, err } return id, body, nil @@ -100,12 +105,14 @@ func (cp *ConnPool) Delete(id uint64) error { err := cp.conn.Delete(id) if err != nil { + // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { return errN + } else { + // retry Delete only when we redialed + return cp.conn.Delete(id) } - - return err } return nil } @@ -156,15 +163,29 @@ func (cp *ConnPool) redial() error { return nil } -var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} +var connErrors = map[string]struct{}{"EOF": {}} func (cp *ConnPool) checkAndRedial(err error) error { const op = errors.Op("connection_pool_check_redial") + switch et := err.(type) { + + // check if the error + case beanstalk.ConnError: + switch bErr := et.Err.(type) { + case *net.OpError: + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", bErr, errR)) + } - for _, errStr := range connErrors { - if connErr, ok := err.(beanstalk.ConnError); ok { - // if error is related to the broken connection - redial - if strings.Contains(errStr, connErr.Err.Error()) { + // if redial was successful -> continue listening + return nil + default: + if _, ok := connErrors[et.Err.Error()]; ok { + // if error is related to the broken connection - redial cp.RUnlock() errR := cp.redial() cp.RLock() @@ -178,5 +199,6 @@ func (cp *ConnPool) checkAndRedial(err error) error { } } - return nil + // return initial error + return err } diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 1c2e9781..1490e587 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -224,7 +224,9 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { func (j *JobConsumer) Stop() error { pipe := j.pipeline.Load().(*pipeline.Pipeline) - j.stopCh <- struct{}{} + if atomic.LoadUint32(&j.listeners) == 1 { + j.stopCh <- struct{}{} + } j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index 2c2873c2..b797fc12 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -100,7 +100,7 @@ func (i *Item) Ack() error { } func (i *Item) Nack() error { - return nil + return i.Options.conn.Delete(i.Options.id) } func fromJob(job *job.Job) *Item { diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index ec0b5ca8..0f98312a 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -1,5 +1,7 @@ package beanstalk +import "github.com/beanstalkd/go-beanstalk" + func (j *JobConsumer) listen() { for { select { @@ -9,6 +11,13 @@ func (j *JobConsumer) listen() { default: id, body, err := j.pool.Reserve(j.reserveTimeout) if err != nil { + if errB, ok := err.(beanstalk.ConnError); ok { + switch errB.Err { + case beanstalk.ErrTimeout: + j.log.Info("beanstalk reserve timeout", "warn", errB.Op) + continue + } + } // in case of other error - continue j.log.Error("beanstalk reserve", "error", err) continue |