diff options
Diffstat (limited to 'plugins/jobs/drivers/ephemeral/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 115 |
1 files changed, 71 insertions, 44 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 043da118..050d74b9 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -31,9 +31,11 @@ type JobConsumer struct { pq priorityqueue.Queue localPrefetch chan *Item + // time.sleep goroutines max number goroutinesMaxNum uint64 - stopCh chan struct{} + requeueCh chan *Item + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -45,6 +47,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -61,6 +64,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh // consume from the queue go jb.consume() + jb.requeueListener() return jb, nil } @@ -72,6 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } // initialize a local queue @@ -79,6 +84,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand // consume from the queue go jb.consume() + jb.requeueListener() return jb, nil } @@ -87,45 +93,54 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - if b, ok := j.pipeline.Load(jb.Options.Pipeline); ok { - if !b.(bool) { - return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline)) - } - - msg := fromJob(jb) - // handle timeouts - // theoretically, some bad user may send a millions requests with a delay and produce a billion (for example) - // goroutines here. We should limit goroutines here. - if msg.Options.Delay > 0 { - // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 { - return errors.E(op, errors.Str("max concurrency number reached")) - } + b, ok := j.pipeline.Load(jb.Options.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) + } - go func(jj *job.Job) { - atomic.AddUint64(&j.goroutinesMaxNum, 1) - time.Sleep(jj.Options.DelayDuration()) + if !b.(bool) { + return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline)) + } - // send the item after timeout expired - j.localPrefetch <- msg + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } - atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0)) - }(jb) + return nil +} - return nil +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("ephemeral_handle_request") + // handle timeouts + // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) + // goroutines here. We should limit goroutines here. + if msg.Options.Delay > 0 { + // if we have 1000 goroutines waiting on the delay - reject 1001 + if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 { + return errors.E(op, errors.Str("max concurrency number reached")) } - // insert to the local, limited pipeline - select { - case j.localPrefetch <- msg: - default: - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d", j.cfg.Prefetch)) - } + go func(jj *Item) { + atomic.AddUint64(&j.goroutinesMaxNum, 1) + time.Sleep(jj.Options.DelayDuration()) + + // send the item after timeout expired + j.localPrefetch <- jj + + atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0)) + }(msg) return nil } - return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) + // insert to the local, limited pipeline + select { + case j.localPrefetch <- msg: + return nil + case <-ctx.Done(): + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) + } } func (j *JobConsumer) consume() { @@ -133,6 +148,9 @@ func (j *JobConsumer) consume() { for { select { case item := <-j.localPrefetch: + + // set requeue channel + item.Options.requeueCh = j.requeueCh j.pq.Insert(item) case <-j.stopCh: return @@ -140,7 +158,7 @@ func (j *JobConsumer) consume() { } } -func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline) error { +func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") if _, ok := j.pipeline.Load(pipeline.Name()); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) @@ -151,12 +169,14 @@ func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline) return nil } -func (j *JobConsumer) Pause(ctx context.Context, pipeline string) { +func (j *JobConsumer) Pause(_ context.Context, pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == true { // mark pipeline as turned off j.pipeline.Store(pipeline, false) } + // if not true - do not send the EventPipeStopped, because pipe already stopped + return } j.eh.Push(events.JobEvent{ @@ -167,12 +187,15 @@ func (j *JobConsumer) Pause(ctx context.Context, pipeline string) { }) } -func (j *JobConsumer) Resume(ctx context.Context, pipeline string) { +func (j *JobConsumer) Resume(_ context.Context, pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == false { // mark pipeline as turned on j.pipeline.Store(pipeline, true) } + + // if not true - do not send the EventPipeActive, because pipe already active + return } j.eh.Push(events.JobEvent{ @@ -184,7 +207,7 @@ func (j *JobConsumer) Resume(ctx context.Context, pipeline string) { } // Run is no-op for the ephemeral -func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error { +func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), @@ -194,7 +217,8 @@ func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(context.Context) error { +func (j *JobConsumer) Stop(ctx context.Context) error { + const op = errors.Op("ephemeral_plugin_stop") var pipe string j.pipeline.Range(func(key, _ interface{}) bool { pipe = key.(string) @@ -202,15 +226,18 @@ func (j *JobConsumer) Stop(context.Context) error { return true }) + select { // return from the consumer - j.stopCh <- struct{}{} - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Pipeline: pipe, - Start: time.Now(), - Elapsed: 0, - }) + case j.stopCh <- struct{}{}: + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe, + Start: time.Now(), + Elapsed: 0, + }) + return nil - return nil + case <-ctx.Done(): + return errors.E(op, ctx.Err()) + } } |