1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
package beanstalk
import (
"sync"
"time"
"github.com/beanstalkd/go-beanstalk"
"github.com/spiral/errors"
)
type ConnPool struct {
sync.RWMutex
conn *beanstalk.Conn
connT *beanstalk.Conn
ts *beanstalk.TubeSet
t *beanstalk.Tube
network string
address string
tName string
tout time.Duration
}
func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, error) {
connT, err := beanstalk.DialTimeout(network, address, tout)
if err != nil {
return nil, err
}
connTS, err := beanstalk.DialTimeout(network, address, tout)
if err != nil {
return nil, err
}
tube := beanstalk.NewTube(connT, tName)
ts := beanstalk.NewTubeSet(connTS, tName)
return &ConnPool{
network: network,
address: address,
tName: tName,
tout: tout,
conn: connTS,
connT: connT,
ts: ts,
t: tube,
}, nil
}
func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
cp.RLock()
defer cp.RUnlock()
return cp.t.Put(body, pri, delay, ttr)
}
// Reserve reserves and returns a job from one of the tubes in t. If no
// job is available before time timeout has passed, Reserve returns a
// ConnError recording ErrTimeout.
//
// Typically, a client will reserve a job, perform some work, then delete
// the job with Conn.Delete.
func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (id uint64, body []byte, err error) {
cp.RLock()
defer cp.RUnlock()
return cp.ts.Reserve(reserveTimeout)
}
func (cp *ConnPool) Delete(id uint64) error {
cp.RLock()
defer cp.RUnlock()
return cp.conn.Delete(id)
}
func (cp *ConnPool) Redial() error {
const op = errors.Op("connection_pool_redial")
connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
if err != nil {
return err
}
if connT == nil {
return errors.E(op, errors.Str("connectionT is nil"))
}
connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
if err != nil {
return err
}
if connTS == nil {
return errors.E(op, errors.Str("connectionTS is nil"))
}
cp.Lock()
cp.t = beanstalk.NewTube(connT, cp.tName)
cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
cp.conn = connTS
cp.connT = connT
cp.Unlock()
return nil
}
|