diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/connection.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 44 |
1 files changed, 33 insertions, 11 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 62301bed..fc659902 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -1,7 +1,7 @@ package beanstalk import ( - "strings" + "net" "sync" "time" @@ -64,6 +64,9 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint errN := cp.checkAndRedial(err) if errN != nil { return 0, errN + } else { + // retry put only when we redialed + return cp.t.Put(body, pri, delay, ttr) } } @@ -83,12 +86,14 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error id, body, err := cp.ts.Reserve(reserveTimeout) if err != nil { + // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { return 0, nil, errN + } else { + // retry Reserve only when we redialed + return cp.ts.Reserve(reserveTimeout) } - - return 0, nil, err } return id, body, nil @@ -100,12 +105,14 @@ func (cp *ConnPool) Delete(id uint64) error { err := cp.conn.Delete(id) if err != nil { + // errN contains both, err and internal checkAndRedial error errN := cp.checkAndRedial(err) if errN != nil { return errN + } else { + // retry Delete only when we redialed + return cp.conn.Delete(id) } - - return err } return nil } @@ -156,15 +163,29 @@ func (cp *ConnPool) redial() error { return nil } -var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} +var connErrors = map[string]struct{}{"EOF": {}} func (cp *ConnPool) checkAndRedial(err error) error { const op = errors.Op("connection_pool_check_redial") + switch et := err.(type) { + + // check if the error + case beanstalk.ConnError: + switch bErr := et.Err.(type) { + case *net.OpError: + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", bErr, errR)) + } - for _, errStr := range connErrors { - if connErr, ok := err.(beanstalk.ConnError); ok { - // if error is related to the broken connection - redial - if strings.Contains(errStr, connErr.Err.Error()) { + // if redial was successful -> continue listening + return nil + default: + if _, ok := connErrors[et.Err.Error()]; ok { + // if error is related to the broken connection - redial cp.RUnlock() errR := cp.redial() cp.RLock() @@ -178,5 +199,6 @@ func (cp *ConnPool) checkAndRedial(err error) error { } } - return nil + // return initial error + return err } |