diff options
author | Valery Piashchynski <[email protected]> | 2021-07-22 00:45:13 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-22 00:45:13 +0300 |
commit | e4b955135692bc2bc0fd712b6779d83d32fbd17c (patch) | |
tree | b3fe2d03ca51d89f034fef9ce1e4148d4acb8b5a /plugins | |
parent | b2da831f47284974551710d2767a7bdde0efa51d (diff) |
Update beanstalk redial algo.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 126 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 10 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 45 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/redial.go | 41 |
4 files changed, 108 insertions, 114 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index fd7a3902..62301bed 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -1,15 +1,21 @@ package beanstalk import ( + "strings" "sync" "time" "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" ) type ConnPool struct { sync.RWMutex + + log logger.Logger + conn *beanstalk.Conn connT *beanstalk.Conn ts *beanstalk.TubeSet @@ -21,7 +27,7 @@ type ConnPool struct { tout time.Duration } -func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, error) { +func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) { connT, err := beanstalk.DialTimeout(network, address, tout) if err != nil { return nil, err @@ -36,6 +42,7 @@ func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, ts := beanstalk.NewTubeSet(connTS, tName) return &ConnPool{ + log: log, network: network, address: address, tName: tName, @@ -50,7 +57,17 @@ func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { cp.RLock() defer cp.RUnlock() - return cp.t.Put(body, pri, delay, ttr) + + id, err := cp.t.Put(body, pri, delay, ttr) + if err != nil { + // errN contains both, err and internal checkAndRedial error + errN := cp.checkAndRedial(err) + if errN != nil { + return 0, errN + } + } + + return id, nil } // Reserve reserves and returns a job from one of the tubes in t. If no @@ -59,42 +76,107 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint // // Typically, a client will reserve a job, perform some work, then delete // the job with Conn.Delete. -func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (id uint64, body []byte, err error) { + +func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { cp.RLock() defer cp.RUnlock() - return cp.ts.Reserve(reserveTimeout) + + id, body, err := cp.ts.Reserve(reserveTimeout) + if err != nil { + errN := cp.checkAndRedial(err) + if errN != nil { + return 0, nil, errN + } + + return 0, nil, err + } + + return id, body, nil } func (cp *ConnPool) Delete(id uint64) error { cp.RLock() defer cp.RUnlock() - return cp.conn.Delete(id) -} -func (cp *ConnPool) Redial() error { - const op = errors.Op("connection_pool_redial") - connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + err := cp.conn.Delete(id) if err != nil { + errN := cp.checkAndRedial(err) + if errN != nil { + return errN + } + return err } - if connT == nil { - return errors.E(op, errors.Str("connectionT is nil")) + return nil +} + +func (cp *ConnPool) redial() error { + const op = errors.Op("connection_pool_redial") + + cp.Lock() + // backoff here + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = time.Minute * 5 + + operation := func() error { + connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + if connT == nil { + return errors.E(op, errors.Str("connectionT is nil")) + } + + connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + + if connTS == nil { + return errors.E(op, errors.Str("connectionTS is nil")) + } + + cp.t = beanstalk.NewTube(connT, cp.tName) + cp.ts = beanstalk.NewTubeSet(connTS, cp.tName) + cp.conn = connTS + cp.connT = connT + + cp.log.Info("beanstalk redial was successful") + return nil } - connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) - if err != nil { - return err + retryErr := backoff.Retry(operation, expb) + if retryErr != nil { + cp.Unlock() + return retryErr } + cp.Unlock() + + return nil +} - if connTS == nil { - return errors.E(op, errors.Str("connectionTS is nil")) +var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"} + +func (cp *ConnPool) checkAndRedial(err error) error { + const op = errors.Op("connection_pool_check_redial") + + for _, errStr := range connErrors { + if connErr, ok := err.(beanstalk.ConnError); ok { + // if error is related to the broken connection - redial + if strings.Contains(errStr, connErr.Err.Error()) { + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", err, errR)) + } + // if redial was successful -> continue listening + return nil + } + } } - cp.Lock() - cp.t = beanstalk.NewTube(connT, cp.tName) - cp.ts = beanstalk.NewTubeSet(connTS, cp.tName) - cp.conn = connTS - cp.connT = connT - cp.Unlock() return nil } diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index cce85c99..1c2e9781 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -66,7 +66,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } - cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout) + cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log) if err != nil { return nil, errors.E(op, err) } @@ -90,9 +90,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config reconnectCh: make(chan struct{}), } - // start redial listener - go jc.redial() - return jc, nil } @@ -121,7 +118,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } - cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout) + cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log) if err != nil { return nil, errors.E(op, err) } @@ -145,9 +142,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu reconnectCh: make(chan struct{}, 2), } - // start redial listener - go jc.redial() - return jc, nil } func (j *JobConsumer) Push(jb *job.Job) error { diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index 33dd4fe5..ec0b5ca8 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -1,15 +1,6 @@ package beanstalk -import ( - "time" - - "github.com/beanstalkd/go-beanstalk" - "github.com/cenkalti/backoff/v4" - "github.com/spiral/errors" -) - -func (j *JobConsumer) listen() { //nolint:gocognit - const op = errors.Op("beanstalk_listen") +func (j *JobConsumer) listen() { for { select { case <-j.stopCh: @@ -18,39 +9,7 @@ func (j *JobConsumer) listen() { //nolint:gocognit default: id, body, err := j.pool.Reserve(j.reserveTimeout) if err != nil { - // reserve timeout - if connErr, ok := err.(beanstalk.ConnError); ok { - switch connErr.Err { - case beanstalk.ErrTimeout: - j.log.Warn("timeout expired", "warn", connErr.Error()) - continue - default: - j.log.Error("beanstalk connection error", "error", connErr.Error()) - - // backoff here - expb := backoff.NewExponentialBackOff() - // set the retry timeout (minutes) - expb.MaxElapsedTime = time.Minute * 5 - - operation := func() error { - errR := j.pool.Redial() - if errR != nil { - return errors.E(op, errR) - } - - j.log.Info("beanstalk redial was successful") - // reassign a pool - return nil - } - - retryErr := backoff.Retry(operation, expb) - if retryErr != nil { - j.log.Error("beanstalk backoff failed, exiting from listener", "error", connErr, "retry error", retryErr) - return - } - continue - } - } + // in case of other error - continue j.log.Error("beanstalk reserve", "error", err) continue } diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go deleted file mode 100644 index 784337ad..00000000 --- a/plugins/jobs/drivers/beanstalk/redial.go +++ /dev/null @@ -1,41 +0,0 @@ -package beanstalk - -import ( - "sync/atomic" - "time" - - "github.com/cenkalti/backoff/v4" -) - -func (j *JobConsumer) redial() { - for range j.reconnectCh { - // backoff here - expb := backoff.NewExponentialBackOff() - // set the retry timeout (minutes) - expb.MaxElapsedTime = time.Minute * 5 - - op := func() error { - err := j.pool.Redial() - if err != nil { - return err - } - - j.log.Info("beanstalk redial was successful") - // reassign a pool - return nil - } - - retryErr := backoff.Retry(op, expb) - if retryErr != nil { - j.log.Error("beanstalk backoff failed", "error", retryErr) - continue - } - - // restart listener - if atomic.LoadUint32(&j.listeners) == 1 { - // stop previous listener - j.stopCh <- struct{}{} - go j.listen() - } - } -} |