diff options
Diffstat (limited to 'plugins/beanstalk/connection.go')
-rw-r--r-- | plugins/beanstalk/connection.go | 223 |
1 files changed, 223 insertions, 0 deletions
diff --git a/plugins/beanstalk/connection.go b/plugins/beanstalk/connection.go new file mode 100644 index 00000000..d3241b37 --- /dev/null +++ b/plugins/beanstalk/connection.go @@ -0,0 +1,223 @@ +package beanstalk + +import ( + "context" + "net" + "sync" + "time" + + "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type ConnPool struct { + sync.RWMutex + + log logger.Logger + + conn *beanstalk.Conn + connT *beanstalk.Conn + ts *beanstalk.TubeSet + t *beanstalk.Tube + + network string + address string + tName string + tout time.Duration +} + +func NewConnPool(network, address, tName string, tout time.Duration, log logger.Logger) (*ConnPool, error) { + connT, err := beanstalk.DialTimeout(network, address, tout) + if err != nil { + return nil, err + } + + connTS, err := beanstalk.DialTimeout(network, address, tout) + if err != nil { + return nil, err + } + + tube := beanstalk.NewTube(connT, tName) + ts := beanstalk.NewTubeSet(connTS, tName) + + return &ConnPool{ + log: log, + network: network, + address: address, + tName: tName, + tout: tout, + conn: connTS, + connT: connT, + ts: ts, + t: tube, + }, nil +} + +// Put the payload +// TODO use the context ?? +func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { + cp.RLock() + defer cp.RUnlock() + + // TODO(rustatian): redial based on the token + id, err := cp.t.Put(body, pri, delay, ttr) + if err != nil { + // errN contains both, err and internal checkAndRedial error + errN := cp.checkAndRedial(err) + if errN != nil { + return 0, errors.Errorf("err: %s\nerr redial: %s", err, errN) + } else { + // retry put only when we redialed + return cp.t.Put(body, pri, delay, ttr) + } + } + + return id, nil +} + +// Reserve reserves and returns a job from one of the tubes in t. If no +// job is available before time timeout has passed, Reserve returns a +// ConnError recording ErrTimeout. +// +// Typically, a client will reserve a job, perform some work, then delete +// the job with Conn.Delete. +func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { + cp.RLock() + defer cp.RUnlock() + + id, body, err := cp.ts.Reserve(reserveTimeout) + if err != nil { + // errN contains both, err and internal checkAndRedial error + errN := cp.checkAndRedial(err) + if errN != nil { + return 0, nil, errors.Errorf("err: %s\nerr redial: %s", err, errN) + } else { + // retry Reserve only when we redialed + return cp.ts.Reserve(reserveTimeout) + } + } + + return id, body, nil +} + +func (cp *ConnPool) Delete(_ context.Context, id uint64) error { + cp.RLock() + defer cp.RUnlock() + + err := cp.conn.Delete(id) + if err != nil { + // errN contains both, err and internal checkAndRedial error + errN := cp.checkAndRedial(err) + if errN != nil { + return errors.Errorf("err: %s\nerr redial: %s", err, errN) + } else { + // retry Delete only when we redialed + return cp.conn.Delete(id) + } + } + return nil +} + +func (cp *ConnPool) Stats(_ context.Context) (map[string]string, error) { + cp.RLock() + defer cp.RUnlock() + + stat, err := cp.conn.Stats() + if err != nil { + errR := cp.checkAndRedial(err) + if errR != nil { + return nil, errors.Errorf("err: %s\nerr redial: %s", err, errR) + } else { + return cp.conn.Stats() + } + } + + return stat, nil +} + +func (cp *ConnPool) redial() error { + const op = errors.Op("connection_pool_redial") + + cp.Lock() + // backoff here + expb := backoff.NewExponentialBackOff() + // TODO(rustatian) set via config + expb.MaxElapsedTime = time.Minute + + operation := func() error { + connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + if connT == nil { + return errors.E(op, errors.Str("connectionT is nil")) + } + + connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + + if connTS == nil { + return errors.E(op, errors.Str("connectionTS is nil")) + } + + cp.t = beanstalk.NewTube(connT, cp.tName) + cp.ts = beanstalk.NewTubeSet(connTS, cp.tName) + cp.conn = connTS + cp.connT = connT + + cp.log.Info("beanstalk redial was successful") + return nil + } + + retryErr := backoff.Retry(operation, expb) + if retryErr != nil { + cp.Unlock() + return retryErr + } + cp.Unlock() + + return nil +} + +var connErrors = map[string]struct{}{"EOF": {}} + +func (cp *ConnPool) checkAndRedial(err error) error { + const op = errors.Op("connection_pool_check_redial") + switch et := err.(type) { //nolint:gocritic + // check if the error + case beanstalk.ConnError: + switch bErr := et.Err.(type) { + case *net.OpError: + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", bErr, errR)) + } + + // if redial was successful -> continue listening + return nil + default: + if _, ok := connErrors[et.Err.Error()]; ok { + // if error is related to the broken connection - redial + cp.RUnlock() + errR := cp.redial() + cp.RLock() + // if redial failed - return + if errR != nil { + return errors.E(op, errors.Errorf("%v:%v", err, errR)) + } + // if redial was successful -> continue listening + return nil + } + } + } + + // return initial error + return err +} |