diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/config.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 100 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 77 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 17 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 56 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/redial.go | 34 |
6 files changed, 223 insertions, 63 deletions
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/jobs/drivers/beanstalk/config.go index f05ee122..6a8bda1d 100644 --- a/plugins/jobs/drivers/beanstalk/config.go +++ b/plugins/jobs/drivers/beanstalk/config.go @@ -30,7 +30,7 @@ func (c *Config) InitDefault() { } if c.ReserveTimeout == 0 { - c.ReserveTimeout = time.Second * 5 + c.ReserveTimeout = time.Second * 1 } if c.PipePriority == 0 { diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go new file mode 100644 index 00000000..fd7a3902 --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -0,0 +1,100 @@ +package beanstalk + +import ( + "sync" + "time" + + "github.com/beanstalkd/go-beanstalk" + "github.com/spiral/errors" +) + +type ConnPool struct { + sync.RWMutex + conn *beanstalk.Conn + connT *beanstalk.Conn + ts *beanstalk.TubeSet + t *beanstalk.Tube + + network string + address string + tName string + tout time.Duration +} + +func NewConnPool(network, address, tName string, tout time.Duration) (*ConnPool, error) { + connT, err := beanstalk.DialTimeout(network, address, tout) + if err != nil { + return nil, err + } + + connTS, err := beanstalk.DialTimeout(network, address, tout) + if err != nil { + return nil, err + } + + tube := beanstalk.NewTube(connT, tName) + ts := beanstalk.NewTubeSet(connTS, tName) + + return &ConnPool{ + network: network, + address: address, + tName: tName, + tout: tout, + conn: connTS, + connT: connT, + ts: ts, + t: tube, + }, nil +} + +func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { + cp.RLock() + defer cp.RUnlock() + return cp.t.Put(body, pri, delay, ttr) +} + +// Reserve reserves and returns a job from one of the tubes in t. If no +// job is available before time timeout has passed, Reserve returns a +// ConnError recording ErrTimeout. +// +// Typically, a client will reserve a job, perform some work, then delete +// the job with Conn.Delete. +func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (id uint64, body []byte, err error) { + cp.RLock() + defer cp.RUnlock() + return cp.ts.Reserve(reserveTimeout) +} + +func (cp *ConnPool) Delete(id uint64) error { + cp.RLock() + defer cp.RUnlock() + return cp.conn.Delete(id) +} + +func (cp *ConnPool) Redial() error { + const op = errors.Op("connection_pool_redial") + connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + if connT == nil { + return errors.E(op, errors.Str("connectionT is nil")) + } + + connTS, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) + if err != nil { + return err + } + + if connTS == nil { + return errors.E(op, errors.Str("connectionTS is nil")) + } + + cp.Lock() + cp.t = beanstalk.NewTube(connT, cp.tName) + cp.ts = beanstalk.NewTubeSet(connTS, cp.tName) + cp.conn = connTS + cp.connT = connT + cp.Unlock() + return nil +} diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 27d453f4..cce85c99 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -3,11 +3,9 @@ package beanstalk import ( "bytes" "strings" - "sync" "sync/atomic" "time" - "github.com/beanstalkd/go-beanstalk" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" @@ -18,7 +16,6 @@ import ( ) type JobConsumer struct { - sync.Mutex log logger.Logger eh events.Handler pq priorityqueue.Queue @@ -27,11 +24,9 @@ type JobConsumer struct { listeners uint32 // beanstalk + pool *ConnPool addr string network string - conn *beanstalk.Conn - tube *beanstalk.Tube - tubeSet *beanstalk.TubeSet reserveTimeout time.Duration reconnectCh chan struct{} tout time.Duration @@ -71,16 +66,22 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } + cPool, err := NewConnPool(dsn[0], dsn[1], pipeCfg.Tube, globalCfg.Timeout) + if err != nil { + return nil, errors.E(op, err) + } + // initialize job consumer jc := &JobConsumer{ pq: pq, log: log, eh: e, + pool: cPool, network: dsn[0], addr: dsn[1], tout: globalCfg.Timeout, - reserveTimeout: pipeCfg.ReserveTimeout, tName: pipeCfg.Tube, + reserveTimeout: pipeCfg.ReserveTimeout, tubePriority: pipeCfg.TubePriority, priority: pipeCfg.PipePriority, @@ -89,14 +90,6 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config reconnectCh: make(chan struct{}), } - jc.conn, err = beanstalk.DialTimeout(jc.network, jc.addr, jc.tout) - if err != nil { - return nil, err - } - - jc.tube = beanstalk.NewTube(jc.conn, jc.tName) - jc.tubeSet = beanstalk.NewTubeSet(jc.conn, jc.tName) - // start redial listener go jc.redial() @@ -121,18 +114,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu globalCfg.InitDefault() - // initialize job consumer - jc := &JobConsumer{ - pq: pq, - log: log, - eh: e, - tout: globalCfg.Timeout, - tName: pipe.String(tube, ""), - reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), - stopCh: make(chan struct{}), - reconnectCh: make(chan struct{}), - } - // PARSE CONFIGURATION ------- dsn := strings.Split(globalCfg.Addr, "://") @@ -140,9 +121,28 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu return nil, errors.E(op, errors.Errorf("invalid socket DSN (tcp://localhost:11300, unix://beanstalk.sock), provided: %s", globalCfg.Addr)) } - jc.conn, err = beanstalk.DialTimeout(dsn[0], dsn[1], jc.tout) + cPool, err := NewConnPool(dsn[0], dsn[1], pipe.String(tube, "default"), globalCfg.Timeout) if err != nil { - return nil, err + return nil, errors.E(op, err) + } + + // initialize job consumer + jc := &JobConsumer{ + pq: pq, + log: log, + eh: e, + pool: cPool, + network: dsn[0], + addr: dsn[1], + tout: globalCfg.Timeout, + tName: pipe.String(tube, "default"), + reserveTimeout: time.Second * time.Duration(pipe.Int(reserveTimeout, 5)), + tubePriority: uint32(pipe.Int(tube, 10)), + priority: pipe.Priority(), + + // buffered with two because jobs root plugin can call Stop at the same time as Pause + stopCh: make(chan struct{}, 2), + reconnectCh: make(chan struct{}, 2), } // start redial listener @@ -160,10 +160,6 @@ func (j *JobConsumer) Push(jb *job.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) } - // reconnect protection - j.Lock() - defer j.Unlock() - item := fromJob(jb) bb := new(bytes.Buffer) @@ -188,9 +184,10 @@ func (j *JobConsumer) Push(jb *job.Job) error { // <ttr> seconds, the job will time out and the server will release the job. // The minimum ttr is 1. If the client sends 0, the server will silently // increase the ttr to 1. Maximum ttr is 2**32-1. - id, err := j.tube.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration()) + id, err := j.pool.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration()) if err != nil { - errD := j.conn.Delete(id) + // TODO check for the connection error + errD := j.pool.Delete(id) if errD != nil { return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error())) } @@ -218,11 +215,15 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { atomic.AddUint32(&j.listeners, 1) - j.Lock() - defer j.Unlock() - go j.listen() + j.eh.Push(events.JobEvent{ + Event: events.EventPipeRun, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil } diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index 329d4c8d..2c2873c2 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -6,6 +6,7 @@ import ( "time" "github.com/beanstalkd/go-beanstalk" + json "github.com/json-iterator/go" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" ) @@ -77,7 +78,21 @@ func (i *Item) Body() []byte { // Context packs job context (job, id) into binary payload. // Not used in the sqs, MessageAttributes used instead func (i *Item) Context() ([]byte, error) { - return nil, nil + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Timeout int64 `json:"timeout"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil } func (i *Item) Ack() error { diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index 873930d5..33dd4fe5 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -1,35 +1,69 @@ package beanstalk -func (j *JobConsumer) listen() { +import ( + "time" + + "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" + "github.com/spiral/errors" +) + +func (j *JobConsumer) listen() { //nolint:gocognit + const op = errors.Op("beanstalk_listen") for { select { case <-j.stopCh: j.log.Warn("beanstalk listener stopped") return - default: - // lock used here to prevent consume from the broken connection - j.Lock() - - id, body, err := j.tubeSet.Reserve(j.reserveTimeout) + id, body, err := j.pool.Reserve(j.reserveTimeout) if err != nil { + // reserve timeout + if connErr, ok := err.(beanstalk.ConnError); ok { + switch connErr.Err { + case beanstalk.ErrTimeout: + j.log.Warn("timeout expired", "warn", connErr.Error()) + continue + default: + j.log.Error("beanstalk connection error", "error", connErr.Error()) + + // backoff here + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = time.Minute * 5 + + operation := func() error { + errR := j.pool.Redial() + if errR != nil { + return errors.E(op, errR) + } + + j.log.Info("beanstalk redial was successful") + // reassign a pool + return nil + } + + retryErr := backoff.Retry(operation, expb) + if retryErr != nil { + j.log.Error("beanstalk backoff failed, exiting from listener", "error", connErr, "retry error", retryErr) + return + } + continue + } + } j.log.Error("beanstalk reserve", "error", err) - j.Unlock() continue } item := &Item{} - err = unpack(id, body, j.conn, item) + err = unpack(id, body, j.pool.conn, item) if err != nil { j.log.Error("beanstalk unpack item", "error", err) - j.Unlock() continue } // insert job into the priority queue j.pq.Insert(item) - - j.Unlock() } } } diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go index e1922517..784337ad 100644 --- a/plugins/jobs/drivers/beanstalk/redial.go +++ b/plugins/jobs/drivers/beanstalk/redial.go @@ -2,30 +2,40 @@ package beanstalk import ( "sync/atomic" + "time" - "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" ) func (j *JobConsumer) redial() { for range j.reconnectCh { // backoff here - - j.Lock() - - var err error - j.conn, err = beanstalk.DialTimeout(j.network, j.addr, j.tout) - if err != nil { - panic(err) + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = time.Minute * 5 + + op := func() error { + err := j.pool.Redial() + if err != nil { + return err + } + + j.log.Info("beanstalk redial was successful") + // reassign a pool + return nil } - j.tube = beanstalk.NewTube(j.conn, j.tName) - j.tubeSet = beanstalk.NewTubeSet(j.conn, j.tName) + retryErr := backoff.Retry(op, expb) + if retryErr != nil { + j.log.Error("beanstalk backoff failed", "error", retryErr) + continue + } // restart listener if atomic.LoadUint32(&j.listeners) == 1 { + // stop previous listener + j.stopCh <- struct{}{} go j.listen() } - - j.Unlock() } } |