summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/ephemeral/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/ephemeral/consumer.go')
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go115
1 files changed, 71 insertions, 44 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 043da118..050d74b9 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -31,9 +31,11 @@ type JobConsumer struct {
pq priorityqueue.Queue
localPrefetch chan *Item
+ // time.sleep goroutines max number
goroutinesMaxNum uint64
- stopCh chan struct{}
+ requeueCh chan *Item
+ stopCh chan struct{}
}
func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -45,6 +47,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
eh: eh,
goroutinesMaxNum: 1000,
stopCh: make(chan struct{}, 1),
+ requeueCh: make(chan *Item, 1000),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -61,6 +64,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
// consume from the queue
go jb.consume()
+ jb.requeueListener()
return jb, nil
}
@@ -72,6 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
eh: eh,
goroutinesMaxNum: 1000,
stopCh: make(chan struct{}, 1),
+ requeueCh: make(chan *Item, 1000),
}
// initialize a local queue
@@ -79,6 +84,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
// consume from the queue
go jb.consume()
+ jb.requeueListener()
return jb, nil
}
@@ -87,45 +93,54 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
const op = errors.Op("ephemeral_push")
// check if the pipeline registered
- if b, ok := j.pipeline.Load(jb.Options.Pipeline); ok {
- if !b.(bool) {
- return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
- }
-
- msg := fromJob(jb)
- // handle timeouts
- // theoretically, some bad user may send a millions requests with a delay and produce a billion (for example)
- // goroutines here. We should limit goroutines here.
- if msg.Options.Delay > 0 {
- // if we have 1000 goroutines waiting on the delay - reject 1001
- if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 {
- return errors.E(op, errors.Str("max concurrency number reached"))
- }
+ b, ok := j.pipeline.Load(jb.Options.Pipeline)
+ if !ok {
+ return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
+ }
- go func(jj *job.Job) {
- atomic.AddUint64(&j.goroutinesMaxNum, 1)
- time.Sleep(jj.Options.DelayDuration())
+ if !b.(bool) {
+ return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline))
+ }
- // send the item after timeout expired
- j.localPrefetch <- msg
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
- atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0))
- }(jb)
+ return nil
+}
- return nil
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
+ const op = errors.Op("ephemeral_handle_request")
+ // handle timeouts
+ // theoretically, some bad user may send millions requests with a delay and produce a billion (for example)
+ // goroutines here. We should limit goroutines here.
+ if msg.Options.Delay > 0 {
+ // if we have 1000 goroutines waiting on the delay - reject 1001
+ if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 {
+ return errors.E(op, errors.Str("max concurrency number reached"))
}
- // insert to the local, limited pipeline
- select {
- case j.localPrefetch <- msg:
- default:
- return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d", j.cfg.Prefetch))
- }
+ go func(jj *Item) {
+ atomic.AddUint64(&j.goroutinesMaxNum, 1)
+ time.Sleep(jj.Options.DelayDuration())
+
+ // send the item after timeout expired
+ j.localPrefetch <- jj
+
+ atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0))
+ }(msg)
return nil
}
- return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline))
+ // insert to the local, limited pipeline
+ select {
+ case j.localPrefetch <- msg:
+ return nil
+ case <-ctx.Done():
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err()))
+ }
}
func (j *JobConsumer) consume() {
@@ -133,6 +148,9 @@ func (j *JobConsumer) consume() {
for {
select {
case item := <-j.localPrefetch:
+
+ // set requeue channel
+ item.Options.requeueCh = j.requeueCh
j.pq.Insert(item)
case <-j.stopCh:
return
@@ -140,7 +158,7 @@ func (j *JobConsumer) consume() {
}
}
-func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline) error {
+func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error {
const op = errors.Op("ephemeral_register")
if _, ok := j.pipeline.Load(pipeline.Name()); ok {
return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline))
@@ -151,12 +169,14 @@ func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline)
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, pipeline string) {
+func (j *JobConsumer) Pause(_ context.Context, pipeline string) {
if q, ok := j.pipeline.Load(pipeline); ok {
if q == true {
// mark pipeline as turned off
j.pipeline.Store(pipeline, false)
}
+ // if not true - do not send the EventPipeStopped, because pipe already stopped
+ return
}
j.eh.Push(events.JobEvent{
@@ -167,12 +187,15 @@ func (j *JobConsumer) Pause(ctx context.Context, pipeline string) {
})
}
-func (j *JobConsumer) Resume(ctx context.Context, pipeline string) {
+func (j *JobConsumer) Resume(_ context.Context, pipeline string) {
if q, ok := j.pipeline.Load(pipeline); ok {
if q == false {
// mark pipeline as turned on
j.pipeline.Store(pipeline, true)
}
+
+ // if not true - do not send the EventPipeActive, because pipe already active
+ return
}
j.eh.Push(events.JobEvent{
@@ -184,7 +207,7 @@ func (j *JobConsumer) Resume(ctx context.Context, pipeline string) {
}
// Run is no-op for the ephemeral
-func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error {
j.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Driver: pipe.Driver(),
@@ -194,7 +217,8 @@ func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Stop(context.Context) error {
+func (j *JobConsumer) Stop(ctx context.Context) error {
+ const op = errors.Op("ephemeral_plugin_stop")
var pipe string
j.pipeline.Range(func(key, _ interface{}) bool {
pipe = key.(string)
@@ -202,15 +226,18 @@ func (j *JobConsumer) Stop(context.Context) error {
return true
})
+ select {
// return from the consumer
- j.stopCh <- struct{}{}
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
- Pipeline: pipe,
- Start: time.Now(),
- Elapsed: 0,
- })
+ case j.stopCh <- struct{}{}:
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeStopped,
+ Pipeline: pipe,
+ Start: time.Now(),
+ Elapsed: 0,
+ })
+ return nil
- return nil
+ case <-ctx.Done():
+ return errors.E(op, ctx.Err())
+ }
}