summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go77
1 files changed, 39 insertions, 38 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 27d453f4..cce85c99 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -3,11 +3,9 @@ package beanstalk
import (
"bytes"
"strings"
- "sync"
"sync/atomic"
"time"
- "github.com/beanstalkd/go-beanstalk"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
@@ -18,7 +16,6 @@ import (
)
type JobConsumer struct {
- sync.Mutex
log logger.Logger
eh events.Handler
pq priorityqueue.Queue
@@ -27,11 +24,9 @@ type JobConsumer struct {
listeners uint32
// beanstalk
+ pool *ConnPool
addr string
network string
- conn *beanstalk.Conn
- tube *beanstalk.Tube
- tubeSet *beanstalk.TubeSet
reserveTimeout time.Duration
reconnectCh chan struct{}
tout time.Duration
@@ -71,16 +66,22 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
// initialize job consumer
jc := &JobConsumer{
pq: pq,
log: log,
eh: e,
+ pool: cPool,
network: dsn[0],
addr: dsn[1],
tout: globalCfg.Timeout,
- reserveTimeout: pipeCfg.ReserveTimeout,
tName: pipeCfg.Tube,
+ reserveTimeout: pipeCfg.ReserveTimeout,
tubePriority: pipeCfg.TubePriority,
priority: pipeCfg.PipePriority,
@@ -89,14 +90,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
reconnectCh: make(chan struct{}),
}
- jc.conn, err = beanstalk.DialTimeout(jc.network, jc.addr, jc.tout)
- if err != nil {
- return nil, err
- }
-
- jc.tube = beanstalk.NewTube(jc.conn, jc.tName)
- jc.tubeSet = beanstalk.NewTubeSet(jc.conn, jc.tName)
-
// start redial listener
go jc.redial()
@@ -121,18 +114,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
globalCfg.InitDefault()
- // initialize job consumer
- jc := &JobConsumer{
- pq: pq,
- log: log,
- eh: e,
- tout: globalCfg.Timeout,
- tName: pipe.String(tube, ""),
- reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
- stopCh: make(chan struct{}),
- reconnectCh: make(chan struct{}),
- }
-
// PARSE CONFIGURATION -------
dsn := strings.Split(globalCfg.Addr, "://")
@@ -140,9 +121,28 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr))
}
- jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout)
+ cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout)
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
+ }
+
+ // initialize job consumer
+ jc := &JobConsumer{
+ pq: pq,
+ log: log,
+ eh: e,
+ pool: cPool,
+ network: dsn[0],
+ addr: dsn[1],
+ tout: globalCfg.Timeout,
+ tName: pipe.String(tube, "default"),
+ reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)),
+ tubePriority: uint32(pipe.Int(tube, 10)),
+ priority: pipe.Priority(),
+
+ // buffered with two because jobs root plugin can call Stop at the same time as Pause
+ stopCh: make(chan struct{}, 2),
+ reconnectCh: make(chan struct{}, 2),
}
// start redial listener
@@ -160,10 +160,6 @@ func (j *JobConsumer) Push(jb *job.Job) error {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
}
- // reconnect protection
- j.Lock()
- defer j.Unlock()
-
item := fromJob(jb)
bb := new(bytes.Buffer)
@@ -188,9 +184,10 @@ func (j *JobConsumer) Push(jb *job.Job) error {
// <ttr> seconds, the job will time out and the server will release the job.
// The minimum ttr is 1. If the client sends 0, the server will silently
// increase the ttr to 1. Maximum ttr is 2**32-1.
- id, err := j.tube.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration())
+ id, err := j.pool.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration())
if err != nil {
- errD := j.conn.Delete(id)
+ // TODO check for the connection error
+ errD := j.pool.Delete(id)
if errD != nil {
return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error()))
}
@@ -218,11 +215,15 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
atomic.AddUint32(&j.listeners, 1)
- j.Lock()
- defer j.Unlock()
-
go j.listen()
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeRun,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}