summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk/connection.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/connection.go')
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go100
1 files changed, 100 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
new file mode 100644
index 00000000..fd7a3902
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -0,0 +1,100 @@
+package beanstalk
+
+import (
+ "sync"
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/spiral/errors"
+)
+
+type ConnPool struct {
+ sync.RWMutex
+ conn *beanstalk.Conn
+ connT *beanstalk.Conn
+ ts *beanstalk.TubeSet
+ t *beanstalk.Tube
+
+ network string
+ address string
+ tName string
+ tout time.Duration
+}
+
+func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, error) {
+ connT, err := beanstalk.DialTimeout(network, address, tout)
+ if err != nil {
+ return nil, err
+ }
+
+ connTS, err := beanstalk.DialTimeout(network, address, tout)
+ if err != nil {
+ return nil, err
+ }
+
+ tube := beanstalk.NewTube(connT, tName)
+ ts := beanstalk.NewTubeSet(connTS, tName)
+
+ return &ConnPool{
+ network: network,
+ address: address,
+ tName: tName,
+ tout: tout,
+ conn: connTS,
+ connT: connT,
+ ts: ts,
+ t: tube,
+ }, nil
+}
+
+func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+ return cp.t.Put(body, pri, delay, ttr)
+}
+
+// Reserve reserves and returns a job from one of the tubes in t. If no
+// job is available before time timeout has passed, Reserve returns a
+// ConnError recording ErrTimeout.
+//
+// Typically, a client will reserve a job, perform some work, then delete
+// the job with Conn.Delete.
+func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (id uint64, body []byte, err error) {
+ cp.RLock()
+ defer cp.RUnlock()
+ return cp.ts.Reserve(reserveTimeout)
+}
+
+func (cp *ConnPool) Delete(id uint64) error {
+ cp.RLock()
+ defer cp.RUnlock()
+ return cp.conn.Delete(id)
+}
+
+func (cp *ConnPool) Redial() error {
+ const op = errors.Op("connection_pool_redial")
+ connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+ if connT == nil {
+ return errors.E(op, errors.Str("connectionT is nil"))
+ }
+
+ connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+
+ if connTS == nil {
+ return errors.E(op, errors.Str("connectionTS is nil"))
+ }
+
+ cp.Lock()
+ cp.t = beanstalk.NewTube(connT, cp.tName)
+ cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
+ cp.conn = connTS
+ cp.connT = connT
+ cp.Unlock()
+ return nil
+}