summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r--plugins/jobs/drivers/beanstalk/config.go2
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go100
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go77
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go17
-rw-r--r--plugins/jobs/drivers/beanstalk/listen.go56
-rw-r--r--plugins/jobs/drivers/beanstalk/redial.go34
6 files changed, 223 insertions, 63 deletions
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go
index f05ee122..6a8bda1d 100644
--- a/plugins/jobs/drivers/beanstalk/config.go
+++ b/plugins/jobs/drivers/beanstalk/config.go
@@ -30,7 +30,7 @@ func (c *Config) InitDefault() {
}
if c.ReserveTimeout == 0 {
- c.ReserveTimeout = time.Second * 5
+ c.ReserveTimeout = time.Second * 1
}
if c.PipePriority == 0 {
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
new file mode 100644
index 00000000..fd7a3902
--- /dev/null
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -0,0 +1,100 @@
+package beanstalk
+
+import (
+ "sync"
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/spiral/errors"
+)
+
+type ConnPool struct {
+ sync.RWMutex
+ conn *beanstalk.Conn
+ connT *beanstalk.Conn
+ ts *beanstalk.TubeSet
+ t *beanstalk.Tube
+
+ network string
+ address string
+ tName string
+ tout time.Duration
+}
+
+func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, error) {
+ connT, err := beanstalk.DialTimeout(network, address, tout)
+ if err != nil {
+ return nil, err
+ }
+
+ connTS, err := beanstalk.DialTimeout(network, address, tout)
+ if err != nil {
+ return nil, err
+ }
+
+ tube := beanstalk.NewTube(connT, tName)
+ ts := beanstalk.NewTubeSet(connTS, tName)
+
+ return &ConnPool{
+ network: network,
+ address: address,
+ tName: tName,
+ tout: tout,
+ conn: connTS,
+ connT: connT,
+ ts: ts,
+ t: tube,
+ }, nil
+}
+
+func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
+ cp.RLock()
+ defer cp.RUnlock()
+ return cp.t.Put(body, pri, delay, ttr)
+}
+
+// Reserve reserves and returns a job from one of the tubes in t. If no
+// job is available before time timeout has passed, Reserve returns a
+// ConnError recording ErrTimeout.
+//
+// Typically, a client will reserve a job, perform some work, then delete
+// the job with Conn.Delete.
+func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (id uint64, body []byte, err error) {
+ cp.RLock()
+ defer cp.RUnlock()
+ return cp.ts.Reserve(reserveTimeout)
+}
+
+func (cp *ConnPool) Delete(id uint64) error {
+ cp.RLock()
+ defer cp.RUnlock()
+ return cp.conn.Delete(id)
+}
+
+func (cp *ConnPool) Redial() error {
+ const op = errors.Op("connection_pool_redial")
+ connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+ if connT == nil {
+ return errors.E(op, errors.Str("connectionT is nil"))
+ }
+
+ connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
+ if err != nil {
+ return err
+ }
+
+ if connTS == nil {
+ return errors.E(op, errors.Str("connectionTS is nil"))
+ }
+
+ cp.Lock()
+ cp.t = beanstalk.NewTube(connT, cp.tName)
+ cp.ts = beanstalk.NewTubeSet(connTS, cp.tName)
+ cp.conn = connTS
+ cp.connT = connT
+ cp.Unlock()
+ return nil
+}
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 27d453f4..cce85c99 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -3,11 +3,9 @@ package beanstalk
import (
"bytes"
"strings"
- "sync"
"sync/atomic"
"time"
- "github.com/beanstalkd/go-beanstalk"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -18,7 +16,6 @@ import (
)
type JobConsumer struct {
- sync.Mutex
log logger.Logger
eh events.Handler
pq priorityqueue.Queue
@@ -27,11 +24,9 @@ type JobConsumer struct {
listeners uint32
// beanstalk
+ pool *ConnPool
addr string
network string
- conn *beanstalk.Conn
- tube *beanstalk.Tube
- tubeSet *beanstalk.TubeSet
reserveTimeout time.Duration
reconnectCh chan struct{}
tout time.Duration
@@ -71,16 +66,22 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
// initialize job consumer
jc := &JobConsumer{
pq: pq,
log: log,
eh: e,
+ pool: cPool,
network: dsn[0],
addr: dsn[1],
tout: globalCfg.Timeout,
- reserveTimeout: pipeCfg.ReserveTimeout,
tName: pipeCfg.Tube,
+ reserveTimeout: pipeCfg.ReserveTimeout,
tubePriority: pipeCfg.TubePriority,
priority: pipeCfg.PipePriority,
@@ -89,14 +90,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
reconnectCh: make(chan struct{}),
}
- jc.conn, err = beanstalk.DialTimeout(jc.network, jc.addr, jc.tout)
- if err != nil {
- return nil, err
- }
-
- jc.tube = beanstalk.NewTube(jc.conn, jc.tName)
- jc.tubeSet = beanstalk.NewTubeSet(jc.conn, jc.tName)
-
// start redial listener
go jc.redial()
@@ -121,18 +114,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
globalCfg.InitDefault()
- // initialize job consumer
- jc := &JobConsumer{
- pq: pq,
- log: log,
- eh: e,
- tout: globalCfg.Timeout,
- tName: pipe.String(tube, ""),
- reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
- stopCh: make(chan struct{}),
- reconnectCh: make(chan struct{}),
- }
-
// PARSE CONFIGURATION -------
dsn := strings.Split(globalCfg.Addr, "://")
@@ -140,9 +121,28 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
- jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout)
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
+ }
+
+ // initialize job consumer
+ jc := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ pool: cPool,
+ network: dsn[0],
+ addr: dsn[1],
+ tout: globalCfg.Timeout,
+ tName: pipe.String(tube, "default"),
+ reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
+ tubePriority: uint32(pipe.Int(tube, 10)),
+ priority: pipe.Priority(),
+
+ // buffered with two because jobs root plugin can call Stop at the same time as Pause
+ stopCh: make(chan struct{}, 2),
+ reconnectCh: make(chan struct{}, 2),
}
// start redial listener
@@ -160,10 +160,6 @@ func (j *JobConsumer) Push(jb *job.Job) error {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
}
- // reconnect protection
- j.Lock()
- defer j.Unlock()
-
item := fromJob(jb)
bb := new(bytes.Buffer)
@@ -188,9 +184,10 @@ func (j *JobConsumer) Push(jb *job.Job) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.tube.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration())
+ id, err := j.pool.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration())
if err != nil {
- errD := j.conn.Delete(id)
+ // TODO check for the connection error
+ errD := j.pool.Delete(id)
if errD != nil {
return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error()))
}
@@ -218,11 +215,15 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
atomic.AddUint32(&j.listeners, 1)
- j.Lock()
- defer j.Unlock()
-
go j.listen()
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeRun,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index 329d4c8d..2c2873c2 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/beanstalkd/go-beanstalk"
+ json "github.com/json-iterator/go"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -77,7 +78,21 @@ func (i *Item) Body() []byte {
// Context packs job context (job, id) into binary payload.
// Not used in the sqs, MessageAttributes used instead
func (i *Item) Context() ([]byte, error) {
- return nil, nil
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Timeout int64 `json:"timeout"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
}
func (i *Item) Ack() error {
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go
index 873930d5..33dd4fe5 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/jobs/drivers/beanstalk/listen.go
@@ -1,35 +1,69 @@
package beanstalk
-func (j *JobConsumer) listen() {
+import (
+ "time"
+
+ "github.com/beanstalkd/go-beanstalk"
+ "github.com/cenkalti/backoff/v4"
+ "github.com/spiral/errors"
+)
+
+func (j *JobConsumer) listen() { //nolint:gocognit
+ const op = errors.Op("beanstalk_listen")
for {
select {
case <-j.stopCh:
j.log.Warn("beanstalk listener stopped")
return
-
default:
- // lock used here to prevent consume from the broken connection
- j.Lock()
-
- id, body, err := j.tubeSet.Reserve(j.reserveTimeout)
+ id, body, err := j.pool.Reserve(j.reserveTimeout)
if err != nil {
+ // reserve timeout
+ if connErr, ok := err.(beanstalk.ConnError); ok {
+ switch connErr.Err {
+ case beanstalk.ErrTimeout:
+ j.log.Warn("timeout expired", "warn", connErr.Error())
+ continue
+ default:
+ j.log.Error("beanstalk connection error", "error", connErr.Error())
+
+ // backoff here
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = time.Minute * 5
+
+ operation := func() error {
+ errR := j.pool.Redial()
+ if errR != nil {
+ return errors.E(op, errR)
+ }
+
+ j.log.Info("beanstalk redial was successful")
+ // reassign a pool
+ return nil
+ }
+
+ retryErr := backoff.Retry(operation, expb)
+ if retryErr != nil {
+ j.log.Error("beanstalk backoff failed, exiting from listener", "error", connErr, "retry error", retryErr)
+ return
+ }
+ continue
+ }
+ }
j.log.Error("beanstalk reserve", "error", err)
- j.Unlock()
continue
}
item := &Item{}
- err = unpack(id, body, j.conn, item)
+ err = unpack(id, body, j.pool.conn, item)
if err != nil {
j.log.Error("beanstalk unpack item", "error", err)
- j.Unlock()
continue
}
// insert job into the priority queue
j.pq.Insert(item)
-
- j.Unlock()
}
}
}
diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go
index e1922517..784337ad 100644
--- a/plugins/jobs/drivers/beanstalk/redial.go
+++ b/plugins/jobs/drivers/beanstalk/redial.go
@@ -2,30 +2,40 @@ package beanstalk
import (
"sync/atomic"
+ "time"
- "github.com/beanstalkd/go-beanstalk"
+ "github.com/cenkalti/backoff/v4"
)
func (j *JobConsumer) redial() {
for range j.reconnectCh {
// backoff here
-
- j.Lock()
-
- var err error
- j.conn, err = beanstalk.DialTimeout(j.network, j.addr, j.tout)
- if err != nil {
- panic(err)
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = time.Minute * 5
+
+ op := func() error {
+ err := j.pool.Redial()
+ if err != nil {
+ return err
+ }
+
+ j.log.Info("beanstalk redial was successful")
+ // reassign a pool
+ return nil
}
- j.tube = beanstalk.NewTube(j.conn, j.tName)
- j.tubeSet = beanstalk.NewTubeSet(j.conn, j.tName)
+ retryErr := backoff.Retry(op, expb)
+ if retryErr != nil {
+ j.log.Error("beanstalk backoff failed", "error", retryErr)
+ continue
+ }
// restart listener
if atomic.LoadUint32(&j.listeners) == 1 {
+ // stop previous listener
+ j.stopCh <- struct{}{}
go j.listen()
}
-
- j.Unlock()
}
}