diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/connection.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 126 |
1 files changed, 104 insertions, 22 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index fd7a3902..62301bed 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -1,15 +1,21 @@ package beanstalk import ( + "strings" "sync" "time" "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" ) type ConnPool struct { sync.RWMutex + + log logger.Logger + conn *beanstalk.Conn connT *beanstalk.Conn ts *beanstalk.TubeSet @@ -21,7 +27,7 @@ type ConnPool struct { tout time.Duration } -func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, error) { +func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) { connT, err := beanstalk.DialTimeout(network, address, tout) if err != nil { return nil, err @@ -36,6 +42,7 @@ func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, ts := beanstalk.NewTubeSet(connTS, tName) return &ConnPool{ + log: log, network: network, address: address, tName: tName, @@ -50,7 +57,17 @@ func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { cp.RLock() defer cp.RUnlock() - return cp.t.Put(body, pri, delay, ttr) + + id, err := cp.t.Put(body, pri, delay, ttr) + if err != nil { + // errN contains both, err and internal checkAndRedial error + errN := cp.checkAndRedial(err) + if errN != nil { + return 0, errN + } + } + + return id, nil } // Reserve reserves and returns a job from one of the tubes in t. If no @@ -59,42 +76,107 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint // // Typically, a client will reserve a job, perform some work, then delete // the job with Conn.Delete. -func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (id uint64, body []byte, err error) { + +func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { cp.RLock() defer cp.RUnlock() - return cp.ts.Reserve(reserveTimeout) + + id, body, err := cp.ts.Reserve(reserveTimeout) + if err != nil { + errN := cp.checkAndRedial(err) + if errN != nil { + return 0, nil, errN + } + + return 0, nil, err + } + + return id, body, nil } func (cp *ConnPool) Delete(id uint64) error { cp.RLock() defer cp.RUnlock() - return cp.conn.Delete(id) -} -func (cp *ConnPool) Redial() error { - const op = errors.Op("connection_pool_redial") - connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + err := cp.conn.Delete(id) if err != nil { + errN := cp.checkAndRedial(err) + if errN != nil { + return errN + } + return err } - if connT == nil { - return errors.E(op, errors.Str("connectionT is nil")) + return nil +} + +func (cp *ConnPool) redial() error { + const op = errors.Op("connection_pool_redial") + + cp.Lock() + // backoff here + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = time.Minute * 5 + + operation := func() error { + connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + if connT == nil { + return errors.E(op, errors.Str("connectionT is nil")) + } + + connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + + if connTS == nil { + return errors.E(op, errors.Str("connectionTS is nil")) + } + + cp.t = beanstalk.NewTube(connT, cp.tName) + cp.ts = beanstalk.NewTubeSet(connTS, cp.tName) + cp.conn = connTS + cp.connT = connT + + cp.log.Info("beanstalk redial was successful") + return nil } - connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) - if err != nil { - return err + retryErr := backoff.Retry(operation, expb) + if retryErr != nil { + cp.Unlock() + return retryErr } + cp.Unlock() + + return nil +} - if connTS == nil { - return errors.E(op, errors.Str("connectionTS is nil")) +var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} + +func (cp *ConnPool) checkAndRedial(err error) error { + const op = errors.Op("connection_pool_check_redial") + + for _, errStr := range connErrors { + if connErr, ok := err.(beanstalk.ConnError); ok { + // if error is related to the broken connection - redial + if strings.Contains(errStr, connErr.Err.Error()) { + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", err, errR)) + } + // if redial was successful -> continue listening + return nil + } + } } - cp.Lock() - cp.t = beanstalk.NewTube(connT, cp.tName) - cp.ts = beanstalk.NewTubeSet(connTS, cp.tName) - cp.conn = connTS - cp.connT = connT - cp.Unlock() return nil } |