summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/consumer.go')
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go22
1 files changed, 19 insertions, 3 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 54c8318b..b57b22ac 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -36,7 +36,8 @@ type JobConsumer struct {
tubePriority uint32
priority int64
- stopCh chan struct{}
+ stopCh chan struct{}
+ requeueCh chan *Item
}
func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -88,9 +89,12 @@ func NewBeanstalkConsumer(configKey string, log logger.Logger, cfg config.Config
// buffered with two because jobs root plugin can call Stop at the same time as Pause
stopCh: make(chan struct{}, 2),
- reconnectCh: make(chan struct{}),
+ requeueCh: make(chan *Item, 1000),
+ reconnectCh: make(chan struct{}, 2),
}
+ jc.requeueListener()
+
return jc, nil
}
@@ -135,9 +139,12 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg config.Configu
// buffered with two because jobs root plugin can call Stop at the same time as Pause
stopCh: make(chan struct{}, 2),
+ requeueCh: make(chan *Item, 1000),
reconnectCh: make(chan struct{}, 2),
}
+ jc.requeueListener()
+
return jc, nil
}
func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
@@ -150,7 +157,16 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", jb.Options.Pipeline, pipe.Name()))
}
- item := fromJob(jb)
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (j *JobConsumer) handleItem(ctx context.Context, item *Item) error {
+ const op = errors.Op("beanstalk_handle_item")
bb := new(bytes.Buffer)
bb.Grow(64)