summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk/connection.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/connection.go')
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go126
1 files changed, 104 insertions, 22 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index fd7a3902..62301bed 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -1,15 +1,21 @@
package beanstalk
import (
+ "strings"
"sync"
"time"
"github.com/beanstalkd/go-beanstalk"
+ "github.com/cenkalti/backoff/v4"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
)
type ConnPool struct {
sync.RWMutex
+
+ log logger.Logger
+
conn *beanstalk.Conn
connT *beanstalk.Conn
ts *beanstalk.TubeSet
@@ -21,7 +27,7 @@ type ConnPool struct {
tout time.Duration
}
-func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, error) {
+func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) {
connT, err := beanstalk.DialTimeout(network, address, tout)
if err != nil {
return nil, err
@@ -36,6 +42,7 @@ func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool,
ts := beanstalk.NewTubeSet(connTS, tName)
return &ConnPool{
+ log: log,
network: network,
address: address,
tName: tName,
@@ -50,7 +57,17 @@ func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool,
func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
cp.RLock()
defer cp.RUnlock()
- return cp.t.Put(body, pri, delay, ttr)
+
+ id, err := cp.t.Put(body, pri, delay, ttr)
+ if err != nil {
+ // errN contains both, err and internal checkAndRedial error
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return 0, errN
+ }
+ }
+
+ return id, nil
}
// Reserve reserves and returns a job from one of the tubes in t. If no
@@ -59,42 +76,107 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint
//
// Typically, a client will reserve a job, perform some work, then delete
// the job with Conn.Delete.
-func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (id uint64, body []byte, err error) {
+
+func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
cp.RLock()
defer cp.RUnlock()
- return cp.ts.Reserve(reserveTimeout)
+
+ id, body, err := cp.ts.Reserve(reserveTimeout)
+ if err != nil {
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return 0, nil, errN
+ }
+
+ return 0, nil, err
+ }
+
+ return id, body, nil
}
func (cp *ConnPool) Delete(id uint64) error {
cp.RLock()
defer cp.RUnlock()
- return cp.conn.Delete(id)
-}
-func (cp *ConnPool) Redial() error {
- const op = errors.Op("connection_pool_redial")
- connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ err := cp.conn.Delete(id)
if err != nil {
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return errN
+ }
+
return err
}
- if connT == nil {
- return errors.E(op, errors.Str("connectionT is nil"))
+ return nil
+}
+
+func (cp *ConnPool) redial() error {
+ const op = errors.Op("connection_pool_redial")
+
+ cp.Lock()
+ // backoff here
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = time.Minute * 5
+
+ operation := func() error {
+ connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+ if connT == nil {
+ return errors.E(op, errors.Str("connectionT is nil"))
+ }
+
+ connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+
+ if connTS == nil {
+ return errors.E(op, errors.Str("connectionTS is nil"))
+ }
+
+ cp.t = beanstalk.NewTube(connT, cp.tName)
+ cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
+ cp.conn = connTS
+ cp.connT = connT
+
+ cp.log.Info("beanstalk redial was successful")
+ return nil
}
- connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
- if err != nil {
- return err
+ retryErr := backoff.Retry(operation, expb)
+ if retryErr != nil {
+ cp.Unlock()
+ return retryErr
}
+ cp.Unlock()
+
+ return nil
+}
- if connTS == nil {
- return errors.E(op, errors.Str("connectionTS is nil"))
+var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"}
+
+func (cp *ConnPool) checkAndRedial(err error) error {
+ const op = errors.Op("connection_pool_check_redial")
+
+ for _, errStr := range connErrors {
+ if connErr, ok := err.(beanstalk.ConnError); ok {
+ // if error is related to the broken connection - redial
+ if strings.Contains(errStr, connErr.Err.Error()) {
+ cp.RUnlock()
+ errR := cp.redial()
+ cp.RLock()
+ // if redial failed - return
+ if errR != nil {
+ return errors.E(op, errors.Errorf("%v:%v", err, errR))
+ }
+ // if redial was successful -> continue listening
+ return nil
+ }
+ }
}
- cp.Lock()
- cp.t = beanstalk.NewTube(connT, cp.tName)
- cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
- cp.conn = connTS
- cp.connT = connT
- cp.Unlock()
return nil
}