diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 77 |
1 files changed, 39 insertions, 38 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 27d453f4..cce85c99 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -3,11 +3,9 @@ package beanstalk import ( "bytes" "strings" - "sync" "sync/atomic" "time" - "github.com/beanstalkd/go-beanstalk" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" @@ -18,7 +16,6 @@ import ( ) type JobConsumer struct { - sync.Mutex log logger.Logger eh events.Handler pq priorityqueue.Queue @@ -27,11 +24,9 @@ type JobConsumer struct { listeners uint32 // beanstalk + pool *ConnPool addr string network string - conn *beanstalk.Conn - tube *beanstalk.Tube - tubeSet *beanstalk.TubeSet reserveTimeout time.Duration reconnectCh chan struct{} tout time.Duration @@ -71,16 +66,22 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } + cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout) + if err != nil { + return nil, errors.E(op, err) + } + // initialize job consumer jc := &JobConsumer{ pq: pq, log: log, eh: e, + pool: cPool, network: dsn[0], addr: dsn[1], tout: globalCfg.Timeout, - reserveTimeout: pipeCfg.ReserveTimeout, tName: pipeCfg.Tube, + reserveTimeout: pipeCfg.ReserveTimeout, tubePriority: pipeCfg.TubePriority, priority: pipeCfg.PipePriority, @@ -89,14 +90,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config reconnectCh: make(chan struct{}), } - jc.conn, err = beanstalk.DialTimeout(jc.network, jc.addr, jc.tout) - if err != nil { - return nil, err - } - - jc.tube = beanstalk.NewTube(jc.conn, jc.tName) - jc.tubeSet = beanstalk.NewTubeSet(jc.conn, jc.tName) - // start redial listener go jc.redial() @@ -121,18 +114,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu globalCfg.InitDefault() - // initialize job consumer - jc := &JobConsumer{ - pq: pq, - log: log, - eh: e, - tout: globalCfg.Timeout, - tName: pipe.String(tube, ""), - reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), - stopCh: make(chan struct{}), - reconnectCh: make(chan struct{}), - } - // PARSE CONFIGURATION ------- dsn := strings.Split(globalCfg.Addr, "://") @@ -140,9 +121,28 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } - jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout) + cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout) if err != nil { - return nil, err + return nil, errors.E(op, err) + } + + // initialize job consumer + jc := &JobConsumer{ + pq: pq, + log: log, + eh: e, + pool: cPool, + network: dsn[0], + addr: dsn[1], + tout: globalCfg.Timeout, + tName: pipe.String(tube, "default"), + reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), + tubePriority: uint32(pipe.Int(tube, 10)), + priority: pipe.Priority(), + + // buffered with two because jobs root plugin can call Stop at the same time as Pause + stopCh: make(chan struct{}, 2), + reconnectCh: make(chan struct{}, 2), } // start redial listener @@ -160,10 +160,6 @@ func (j *JobConsumer) Push(jb *job.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) } - // reconnect protection - j.Lock() - defer j.Unlock() - item := fromJob(jb) bb := new(bytes.Buffer) @@ -188,9 +184,10 @@ func (j *JobConsumer) Push(jb *job.Job) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.tube.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration()) + id, err := j.pool.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration()) if err != nil { - errD := j.conn.Delete(id) + // TODO check for the connection error + errD := j.pool.Delete(id) if errD != nil { return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error())) } @@ -218,11 +215,15 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { atomic.AddUint32(&j.listeners, 1) - j.Lock() - defer j.Unlock() - go j.listen() + j.eh.Push(events.JobEvent{ + Event: events.EventPipeRun, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil } |