summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 00:45:13 +0300
committerValery Piashchynski <[email protected]>2021-07-22 00:45:13 +0300
commite4b955135692bc2bc0fd712b6779d83d32fbd17c (patch)
treeb3fe2d03ca51d89f034fef9ce1e4148d4acb8b5a /plugins
parentb2da831f47284974551710d2767a7bdde0efa51d (diff)
Update beanstalk redial algo.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go126
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go10
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go45
-rw-r--r--plugins/jobs/drivers/beanstalk/redial.go41
4 files changed, 108 insertions, 114 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index fd7a3902..62301bed 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -1,15 +1,21 @@
package beanstalk
import (
+ "strings"
"sync"
"time"
"github.com/beanstalkd/go-beanstalk"
+ "github.com/cenkalti/backoff/v4"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
)
type ConnPool struct {
sync.RWMutex
+
+ log logger.Logger
+
conn *beanstalk.Conn
connT *beanstalk.Conn
ts *beanstalk.TubeSet
@@ -21,7 +27,7 @@ type ConnPool struct {
tout time.Duration
}
-func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, error) {
+func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) {
connT, err := beanstalk.DialTimeout(network, address, tout)
if err != nil {
return nil, err
@@ -36,6 +42,7 @@ func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool,
ts := beanstalk.NewTubeSet(connTS, tName)
return &ConnPool{
+ log: log,
network: network,
address: address,
tName: tName,
@@ -50,7 +57,17 @@ func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool,
func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
cp.RLock()
defer cp.RUnlock()
- return cp.t.Put(body, pri, delay, ttr)
+
+ id, err := cp.t.Put(body, pri, delay, ttr)
+ if err != nil {
+ // errN contains both, err and internal checkAndRedial error
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return 0, errN
+ }
+ }
+
+ return id, nil
}
// Reserve reserves and returns a job from one of the tubes in t. If no
@@ -59,42 +76,107 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint
//
// Typically, a client will reserve a job, perform some work, then delete
// the job with Conn.Delete.
-func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (id uint64, body []byte, err error) {
+
+func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
cp.RLock()
defer cp.RUnlock()
- return cp.ts.Reserve(reserveTimeout)
+
+ id, body, err := cp.ts.Reserve(reserveTimeout)
+ if err != nil {
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return 0, nil, errN
+ }
+
+ return 0, nil, err
+ }
+
+ return id, body, nil
}
func (cp *ConnPool) Delete(id uint64) error {
cp.RLock()
defer cp.RUnlock()
- return cp.conn.Delete(id)
-}
-func (cp *ConnPool) Redial() error {
- const op = errors.Op("connection_pool_redial")
- connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ err := cp.conn.Delete(id)
if err != nil {
+ errN := cp.checkAndRedial(err)
+ if errN != nil {
+ return errN
+ }
+
return err
}
- if connT == nil {
- return errors.E(op, errors.Str("connectionT is nil"))
+ return nil
+}
+
+func (cp *ConnPool) redial() error {
+ const op = errors.Op("connection_pool_redial")
+
+ cp.Lock()
+ // backoff here
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = time.Minute * 5
+
+ operation := func() error {
+ connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+ if connT == nil {
+ return errors.E(op, errors.Str("connectionT is nil"))
+ }
+
+ connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+
+ if connTS == nil {
+ return errors.E(op, errors.Str("connectionTS is nil"))
+ }
+
+ cp.t = beanstalk.NewTube(connT, cp.tName)
+ cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
+ cp.conn = connTS
+ cp.connT = connT
+
+ cp.log.Info("beanstalk redial was successful")
+ return nil
}
- connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
- if err != nil {
- return err
+ retryErr := backoff.Retry(operation, expb)
+ if retryErr != nil {
+ cp.Unlock()
+ return retryErr
}
+ cp.Unlock()
+
+ return nil
+}
- if connTS == nil {
- return errors.E(op, errors.Str("connectionTS is nil"))
+var connErrors = []string{"pipe", "read tcp", "write tcp", "connection", "EOF"}
+
+func (cp *ConnPool) checkAndRedial(err error) error {
+ const op = errors.Op("connection_pool_check_redial")
+
+ for _, errStr := range connErrors {
+ if connErr, ok := err.(beanstalk.ConnError); ok {
+ // if error is related to the broken connection - redial
+ if strings.Contains(errStr, connErr.Err.Error()) {
+ cp.RUnlock()
+ errR := cp.redial()
+ cp.RLock()
+ // if redial failed - return
+ if errR != nil {
+ return errors.E(op, errors.Errorf("%v:%v", err, errR))
+ }
+ // if redial was successful -> continue listening
+ return nil
+ }
+ }
}
- cp.Lock()
- cp.t = beanstalk.NewTube(connT, cp.tName)
- cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
- cp.conn = connTS
- cp.connT = connT
- cp.Unlock()
return nil
}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index cce85c99..1c2e9781 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -66,7 +66,7 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
- cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout)
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout, log)
if err != nil {
return nil, errors.E(op, err)
}
@@ -90,9 +90,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
reconnectCh: make(chan struct{}),
}
- // start redial listener
- go jc.redial()
-
return jc, nil
}
@@ -121,7 +118,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
- cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout)
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout, log)
if err != nil {
return nil, errors.E(op, err)
}
@@ -145,9 +142,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
reconnectCh: make(chan struct{}, 2),
}
- // start redial listener
- go jc.redial()
-
return jc, nil
}
func (j *JobConsumer) Push(jb *job.Job) error {
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index 33dd4fe5..ec0b5ca8 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -1,15 +1,6 @@
package beanstalk
-import (
- "time"
-
- "github.com/beanstalkd/go-beanstalk"
- "github.com/cenkalti/backoff/v4"
- "github.com/spiral/errors"
-)
-
-func (j *JobConsumer) listen() { //nolint:gocognit
- const op = errors.Op("beanstalk_listen")
+func (j *JobConsumer) listen() {
for {
select {
case <-j.stopCh:
@@ -18,39 +9,7 @@ func (j *JobConsumer) listen() { //nolint:gocognit
default:
id, body, err := j.pool.Reserve(j.reserveTimeout)
if err != nil {
- // reserve timeout
- if connErr, ok := err.(beanstalk.ConnError); ok {
- switch connErr.Err {
- case beanstalk.ErrTimeout:
- j.log.Warn("timeout expired", "warn", connErr.Error())
- continue
- default:
- j.log.Error("beanstalk connection error", "error", connErr.Error())
-
- // backoff here
- expb := backoff.NewExponentialBackOff()
- // set the retry timeout (minutes)
- expb.MaxElapsedTime = time.Minute * 5
-
- operation := func() error {
- errR := j.pool.Redial()
- if errR != nil {
- return errors.E(op, errR)
- }
-
- j.log.Info("beanstalk redial was successful")
- // reassign a pool
- return nil
- }
-
- retryErr := backoff.Retry(operation, expb)
- if retryErr != nil {
- j.log.Error("beanstalk backoff failed, exiting from listener", "error", connErr, "retry error", retryErr)
- return
- }
- continue
- }
- }
+ // in case of other error - continue
j.log.Error("beanstalk reserve", "error", err)
continue
}
diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go
deleted file mode 100644
index 784337ad..00000000
--- a/plugins/jobs/drivers/beanstalk/redial.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package beanstalk
-
-import (
- "sync/atomic"
- "time"
-
- "github.com/cenkalti/backoff/v4"
-)
-
-func (j *JobConsumer) redial() {
- for range j.reconnectCh {
- // backoff here
- expb := backoff.NewExponentialBackOff()
- // set the retry timeout (minutes)
- expb.MaxElapsedTime = time.Minute * 5
-
- op := func() error {
- err := j.pool.Redial()
- if err != nil {
- return err
- }
-
- j.log.Info("beanstalk redial was successful")
- // reassign a pool
- return nil
- }
-
- retryErr := backoff.Retry(op, expb)
- if retryErr != nil {
- j.log.Error("beanstalk backoff failed", "error", retryErr)
- continue
- }
-
- // restart listener
- if atomic.LoadUint32(&j.listeners) == 1 {
- // stop previous listener
- j.stopCh <- struct{}{}
- go j.listen()
- }
- }
-}