summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk/connection.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/connection.go')
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go44
1 files changed, 33 insertions, 11 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index 62301bed..fc659902 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -1,7 +1,7 @@
package beanstalk
import (
- "strings"
+ "net"
"sync"
"time"
@@ -64,6 +64,9 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint
errN := cp.checkAndRedial(err)
if errN != nil {
return 0, errN
+ } else {
+ // retry put only when we redialed
+ return cp.t.Put(body, pri, delay, ttr)
}
}
@@ -83,12 +86,14 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
id, body, err := cp.ts.Reserve(reserveTimeout)
if err != nil {
+ // errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
return 0, nil, errN
+ } else {
+ // retry Reserve only when we redialed
+ return cp.ts.Reserve(reserveTimeout)
}
-
- return 0, nil, err
}
return id, body, nil
@@ -100,12 +105,14 @@ func (cp *ConnPool) Delete(id uint64) error {
err := cp.conn.Delete(id)
if err != nil {
+ // errN contains both, err and internal checkAndRedial error
errN := cp.checkAndRedial(err)
if errN != nil {
return errN
+ } else {
+ // retry Delete only when we redialed
+ return cp.conn.Delete(id)
}
-
- return err
}
return nil
}
@@ -156,15 +163,29 @@ func (cp *ConnPool) redial() error {
return nil
}
-var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"}
+var connErrors = map[string]struct{}{"EOF": {}}
func (cp *ConnPool) checkAndRedial(err error) error {
const op = errors.Op("connection_pool_check_redial")
+ switch et := err.(type) {
+
+ // check if the error
+ case beanstalk.ConnError:
+ switch bErr := et.Err.(type) {
+ case *net.OpError:
+ cp.RUnlock()
+ errR := cp.redial()
+ cp.RLock()
+ // if redial failed - return
+ if errR != nil {
+ return errors.E(op, errors.Errorf("%v:%v", bErr, errR))
+ }
- for _, errStr := range connErrors {
- if connErr, ok := err.(beanstalk.ConnError); ok {
- // if error is related to the broken connection - redial
- if strings.Contains(errStr, connErr.Err.Error()) {
+ // if redial was successful -> continue listening
+ return nil
+ default:
+ if _, ok := connErrors[et.Err.Error()]; ok {
+ // if error is related to the broken connection - redial
cp.RUnlock()
errR := cp.redial()
cp.RLock()
@@ -178,5 +199,6 @@ func (cp *ConnPool) checkAndRedial(err error) error {
}
}
- return nil
+ // return initial error
+ return err
}