diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 22 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 30 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/listen.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/requeue.go | 24 |
5 files changed, 69 insertions, 13 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index ae223f39..32ca4188 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -61,6 +61,7 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t cp.RLock() defer cp.RUnlock() + // TODO(rustatian): redial based on the token id, err := cp.t.Put(body, pri, delay, ttr) if err != nil { // errN contains both, err and internal checkAndRedial error @@ -82,7 +83,6 @@ func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr t // // Typically, a client will reserve a job, perform some work, then delete // the job with Conn.Delete. - func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { cp.RLock() defer cp.RUnlock() @@ -126,7 +126,7 @@ func (cp *ConnPool) redial() error { cp.Lock() // backoff here expb := backoff.NewExponentialBackOff() - // TODO set via config + // TODO(rustatian) set via config expb.MaxElapsedTime = time.Minute operation := func() error { diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 54c8318b..b57b22ac 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -36,7 +36,8 @@ type JobConsumer struct { tubePriority uint32 priority int64 - stopCh chan struct{} + stopCh chan struct{} + requeueCh chan *Item } func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -88,9 +89,12 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config // buffered with two because jobs root plugin can call Stop at the same time as Pause stopCh: make(chan struct{}, 2), - reconnectCh: make(chan struct{}), + requeueCh: make(chan *Item, 1000), + reconnectCh: make(chan struct{}, 2), } + jc.requeueListener() + return jc, nil } @@ -135,9 +139,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu // buffered with two because jobs root plugin can call Stop at the same time as Pause stopCh: make(chan struct{}, 2), + requeueCh: make(chan *Item, 1000), reconnectCh: make(chan struct{}, 2), } + jc.requeueListener() + return jc, nil } func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { @@ -150,7 +157,16 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) } - item := fromJob(jb) + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { + const op = errors.Op("beanstalk_handle_item") bb := new(bytes.Buffer) bb.Grow(64) diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index 7c792b46..91dbf41c 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -7,6 +7,7 @@ import ( "github.com/beanstalkd/go-beanstalk" json "github.com/json-iterator/go" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" ) @@ -40,12 +41,19 @@ type Options struct { // Delay defines time duration to delay execution for. Defaults to none. Delay int64 `json:"delay,omitempty"` - // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. + // Reserve defines for how broker should wait until treating job are failed. + // - <ttr> -- time to run -- is an integer number of seconds to allow a worker + // to run this job. This time is counted from the moment a worker reserves + // this job. If the worker does not delete, release, or bury the job within + // <ttr> seconds, the job will time out and the server will release the job. + // The minimum ttr is 1. If the client sends 0, the server will silently + // increase the ttr to 1. Maximum ttr is 2**32-1. Timeout int64 `json:"timeout,omitempty"` // Private ================ - id uint64 - conn *beanstalk.Conn + id uint64 + conn *beanstalk.Conn + requeueCh chan *Item } // DelayDuration returns delay duration in a form of time.Duration. @@ -103,8 +111,15 @@ func (i *Item) Nack() error { return i.Options.conn.Delete(i.Options.id) } -func (i *Item) Requeue(_ int64) error { - return nil +func (i *Item) Requeue(delay int64) error { + // overwrite the delay + i.Options.Delay = delay + select { + case i.Options.requeueCh <- i: + return nil + default: + return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + } } func fromJob(job *job.Job) *Item { @@ -131,13 +146,14 @@ func (i *Item) pack(b *bytes.Buffer) error { return nil } -func unpack(id uint64, data []byte, conn *beanstalk.Conn, out *Item) error { +func (j *JobConsumer) unpack(id uint64, data []byte, out *Item) error { err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(out) if err != nil { return err } - out.Options.conn = conn + out.Options.conn = j.pool.conn out.Options.id = id + out.Options.requeueCh = j.requeueCh return nil } diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/jobs/drivers/beanstalk/listen.go index aaf635b1..f1385e70 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/jobs/drivers/beanstalk/listen.go @@ -26,7 +26,7 @@ func (j *JobConsumer) listen() { } item := &Item{} - err = unpack(id, body, j.pool.conn, item) + err = j.unpack(id, body, item) if err != nil { j.log.Error("beanstalk unpack item", "error", err) continue diff --git a/plugins/jobs/drivers/beanstalk/requeue.go b/plugins/jobs/drivers/beanstalk/requeue.go new file mode 100644 index 00000000..21053940 --- /dev/null +++ b/plugins/jobs/drivers/beanstalk/requeue.go @@ -0,0 +1,24 @@ +package beanstalk + +import "context" + +// requeueListener should handle items passed to requeue +func (j *JobConsumer) requeueListener() { + go func() { + for { //nolint:gosimple + select { + case item, ok := <-j.requeueCh: + if !ok { + j.log.Info("requeue channel closed") + return + } + + err := j.handleItem(context.TODO(), item) + if err != nil { + j.log.Error("requeue handle item", "error", err) + continue + } + } + } + }() +} |