summaryrefslogtreecommitdiff
path: root/plugins/beanstalk/connection.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/beanstalk/connection.go')
-rw-r--r--plugins/beanstalk/connection.go223
1 files changed, 0 insertions, 223 deletions
diff --git a/plugins/beanstalk/connection.go b/plugins/beanstalk/connection.go
deleted file mode 100644
index d3241b37..00000000
--- a/plugins/beanstalk/connection.go
+++ /dev/null
@@ -1,223 +0,0 @@
-package beanstalk
-
-import (
- "context"
- "net"
- "sync"
- "time"
-
- "github.com/beanstalkd/go-beanstalk"
- "github.com/cenkalti/backoff/v4"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type ConnPool struct {
- sync.RWMutex
-
- log logger.Logger
-
- conn *beanstalk.Conn
- connT *beanstalk.Conn
- ts *beanstalk.TubeSet
- t *beanstalk.Tube
-
- network string
- address string
- tName string
- tout time.Duration
-}
-
-func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) {
- connT, err := beanstalk.DialTimeout(network, address, tout)
- if err != nil {
- return nil, err
- }
-
- connTS, err := beanstalk.DialTimeout(network, address, tout)
- if err != nil {
- return nil, err
- }
-
- tube := beanstalk.NewTube(connT, tName)
- ts := beanstalk.NewTubeSet(connTS, tName)
-
- return &ConnPool{
- log: log,
- network: network,
- address: address,
- tName: tName,
- tout: tout,
- conn: connTS,
- connT: connT,
- ts: ts,
- t: tube,
- }, nil
-}
-
-// Put the payload
-// TODO use the context ??
-func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
- cp.RLock()
- defer cp.RUnlock()
-
- // TODO(rustatian): redial based on the token
- id, err := cp.t.Put(body, pri, delay, ttr)
- if err != nil {
- // errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
- if errN != nil {
- return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN)
- } else {
- // retry put only when we redialed
- return cp.t.Put(body, pri, delay, ttr)
- }
- }
-
- return id, nil
-}
-
-// Reserve reserves and returns a job from one of the tubes in t. If no
-// job is available before time timeout has passed, Reserve returns a
-// ConnError recording ErrTimeout.
-//
-// Typically, a client will reserve a job, perform some work, then delete
-// the job with Conn.Delete.
-func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
- cp.RLock()
- defer cp.RUnlock()
-
- id, body, err := cp.ts.Reserve(reserveTimeout)
- if err != nil {
- // errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
- if errN != nil {
- return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN)
- } else {
- // retry Reserve only when we redialed
- return cp.ts.Reserve(reserveTimeout)
- }
- }
-
- return id, body, nil
-}
-
-func (cp *ConnPool) Delete(_ context.Context, id uint64) error {
- cp.RLock()
- defer cp.RUnlock()
-
- err := cp.conn.Delete(id)
- if err != nil {
- // errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
- if errN != nil {
- return errors.Errorf("err: %s\nerr redial: %s", err, errN)
- } else {
- // retry Delete only when we redialed
- return cp.conn.Delete(id)
- }
- }
- return nil
-}
-
-func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) {
- cp.RLock()
- defer cp.RUnlock()
-
- stat, err := cp.conn.Stats()
- if err != nil {
- errR := cp.checkAndRedial(err)
- if errR != nil {
- return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR)
- } else {
- return cp.conn.Stats()
- }
- }
-
- return stat, nil
-}
-
-func (cp *ConnPool) redial() error {
- const op = errors.Op("connection_pool_redial")
-
- cp.Lock()
- // backoff here
- expb := backoff.NewExponentialBackOff()
- // TODO(rustatian) set via config
- expb.MaxElapsedTime = time.Minute
-
- operation := func() error {
- connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
- if err != nil {
- return err
- }
- if connT == nil {
- return errors.E(op, errors.Str("connectionT is nil"))
- }
-
- connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
- if err != nil {
- return err
- }
-
- if connTS == nil {
- return errors.E(op, errors.Str("connectionTS is nil"))
- }
-
- cp.t = beanstalk.NewTube(connT, cp.tName)
- cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
- cp.conn = connTS
- cp.connT = connT
-
- cp.log.Info("beanstalk redial was successful")
- return nil
- }
-
- retryErr := backoff.Retry(operation, expb)
- if retryErr != nil {
- cp.Unlock()
- return retryErr
- }
- cp.Unlock()
-
- return nil
-}
-
-var connErrors = map[string]struct{}{"EOF": {}}
-
-func (cp *ConnPool) checkAndRedial(err error) error {
- const op = errors.Op("connection_pool_check_redial")
- switch et := err.(type) { //nolint:gocritic
- // check if the error
- case beanstalk.ConnError:
- switch bErr := et.Err.(type) {
- case *net.OpError:
- cp.RUnlock()
- errR := cp.redial()
- cp.RLock()
- // if redial failed - return
- if errR != nil {
- return errors.E(op, errors.Errorf("%v:%v", bErr, errR))
- }
-
- // if redial was successful -> continue listening
- return nil
- default:
- if _, ok := connErrors[et.Err.Error()]; ok {
- // if error is related to the broken connection - redial
- cp.RUnlock()
- errR := cp.redial()
- cp.RLock()
- // if redial failed - return
- if errR != nil {
- return errors.E(op, errors.Errorf("%v:%v", err, errR))
- }
- // if redial was successful -> continue listening
- return nil
- }
- }
- }
-
- // return initial error
- return err
-}