diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 22 |
1 files changed, 19 insertions, 3 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 54c8318b..b57b22ac 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -36,7 +36,8 @@ type JobConsumer struct { tubePriority uint32 priority int64 - stopCh chan struct{} + stopCh chan struct{} + requeueCh chan *Item } func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -88,9 +89,12 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config // buffered with two because jobs root plugin can call Stop at the same time as Pause stopCh: make(chan struct{}, 2), - reconnectCh: make(chan struct{}), + requeueCh: make(chan *Item, 1000), + reconnectCh: make(chan struct{}, 2), } + jc.requeueListener() + return jc, nil } @@ -135,9 +139,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu // buffered with two because jobs root plugin can call Stop at the same time as Pause stopCh: make(chan struct{}, 2), + requeueCh: make(chan *Item, 1000), reconnectCh: make(chan struct{}, 2), } + jc.requeueListener() + return jc, nil } func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { @@ -150,7 +157,16 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name())) } - item := fromJob(jb) + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error { + const op = errors.Op("beanstalk_handle_item") bb := new(bytes.Buffer) bb.Grow(64) |