diff options
Diffstat (limited to 'plugins/jobs/drivers/ephemeral')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 115 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/item.go | 46 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/requeue.go | 25 |
3 files changed, 127 insertions, 59 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 043da118..050d74b9 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -31,9 +31,11 @@ type JobConsumer struct { pq priorityqueue.Queue localPrefetch chan *Item + // time.sleep goroutines max number goroutinesMaxNum uint64 - stopCh chan struct{} + requeueCh chan *Item + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -45,6 +47,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -61,6 +64,7 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh // consume from the queue go jb.consume() + jb.requeueListener() return jb, nil } @@ -72,6 +76,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } // initialize a local queue @@ -79,6 +84,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand // consume from the queue go jb.consume() + jb.requeueListener() return jb, nil } @@ -87,45 +93,54 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { const op = errors.Op("ephemeral_push") // check if the pipeline registered - if b, ok := j.pipeline.Load(jb.Options.Pipeline); ok { - if !b.(bool) { - return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline)) - } - - msg := fromJob(jb) - // handle timeouts - // theoretically, some bad user may send a millions requests with a delay and produce a billion (for example) - // goroutines here. We should limit goroutines here. - if msg.Options.Delay > 0 { - // if we have 1000 goroutines waiting on the delay - reject 1001 - if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 { - return errors.E(op, errors.Str("max concurrency number reached")) - } + b, ok := j.pipeline.Load(jb.Options.Pipeline) + if !ok { + return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) + } - go func(jj *job.Job) { - atomic.AddUint64(&j.goroutinesMaxNum, 1) - time.Sleep(jj.Options.DelayDuration()) + if !b.(bool) { + return errors.E(op, errors.Errorf("pipeline disabled: %s", jb.Options.Pipeline)) + } - // send the item after timeout expired - j.localPrefetch <- msg + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } - atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0)) - }(jb) + return nil +} - return nil +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("ephemeral_handle_request") + // handle timeouts + // theoretically, some bad user may send millions requests with a delay and produce a billion (for example) + // goroutines here. We should limit goroutines here. + if msg.Options.Delay > 0 { + // if we have 1000 goroutines waiting on the delay - reject 1001 + if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 { + return errors.E(op, errors.Str("max concurrency number reached")) } - // insert to the local, limited pipeline - select { - case j.localPrefetch <- msg: - default: - return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d", j.cfg.Prefetch)) - } + go func(jj *Item) { + atomic.AddUint64(&j.goroutinesMaxNum, 1) + time.Sleep(jj.Options.DelayDuration()) + + // send the item after timeout expired + j.localPrefetch <- jj + + atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0)) + }(msg) return nil } - return errors.E(op, errors.Errorf("no such pipeline: %s", jb.Options.Pipeline)) + // insert to the local, limited pipeline + select { + case j.localPrefetch <- msg: + return nil + case <-ctx.Done(): + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d, context error: %v", j.cfg.Prefetch, ctx.Err())) + } } func (j *JobConsumer) consume() { @@ -133,6 +148,9 @@ func (j *JobConsumer) consume() { for { select { case item := <-j.localPrefetch: + + // set requeue channel + item.Options.requeueCh = j.requeueCh j.pq.Insert(item) case <-j.stopCh: return @@ -140,7 +158,7 @@ func (j *JobConsumer) consume() { } } -func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline) error { +func (j *JobConsumer) Register(_ context.Context, pipeline *pipeline.Pipeline) error { const op = errors.Op("ephemeral_register") if _, ok := j.pipeline.Load(pipeline.Name()); ok { return errors.E(op, errors.Errorf("queue %s has already been registered", pipeline)) @@ -151,12 +169,14 @@ func (j *JobConsumer) Register(ctx context.Context, pipeline *pipeline.Pipeline) return nil } -func (j *JobConsumer) Pause(ctx context.Context, pipeline string) { +func (j *JobConsumer) Pause(_ context.Context, pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == true { // mark pipeline as turned off j.pipeline.Store(pipeline, false) } + // if not true - do not send the EventPipeStopped, because pipe already stopped + return } j.eh.Push(events.JobEvent{ @@ -167,12 +187,15 @@ func (j *JobConsumer) Pause(ctx context.Context, pipeline string) { }) } -func (j *JobConsumer) Resume(ctx context.Context, pipeline string) { +func (j *JobConsumer) Resume(_ context.Context, pipeline string) { if q, ok := j.pipeline.Load(pipeline); ok { if q == false { // mark pipeline as turned on j.pipeline.Store(pipeline, true) } + + // if not true - do not send the EventPipeActive, because pipe already active + return } j.eh.Push(events.JobEvent{ @@ -184,7 +207,7 @@ func (j *JobConsumer) Resume(ctx context.Context, pipeline string) { } // Run is no-op for the ephemeral -func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error { +func (j *JobConsumer) Run(_ context.Context, pipe *pipeline.Pipeline) error { j.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Driver: pipe.Driver(), @@ -194,7 +217,8 @@ func (j *JobConsumer) Run(ctx context.Context, pipe *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Stop(context.Context) error { +func (j *JobConsumer) Stop(ctx context.Context) error { + const op = errors.Op("ephemeral_plugin_stop") var pipe string j.pipeline.Range(func(key, _ interface{}) bool { pipe = key.(string) @@ -202,15 +226,18 @@ func (j *JobConsumer) Stop(context.Context) error { return true }) + select { // return from the consumer - j.stopCh <- struct{}{} - - j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, - Pipeline: pipe, - Start: time.Now(), - Elapsed: 0, - }) + case j.stopCh <- struct{}{}: + j.eh.Push(events.JobEvent{ + Event: events.EventPipeStopped, + Pipeline: pipe, + Start: time.Now(), + Elapsed: 0, + }) + return nil - return nil + case <-ctx.Done(): + return errors.E(op, ctx.Err()) + } } diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index 8560f10a..d140c9ed 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -8,20 +8,6 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -func fromJob(job *job.Job) *Item { - return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - Timeout: job.Options.Timeout, - }, - } -} - type Item struct { // Job contains name of job broker (usually PHP class). Job string `json:"job"` @@ -53,6 +39,9 @@ type Options struct { // Timeout defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout int64 `json:"timeout,omitempty"` + + // private + requeueCh chan *Item } // DelayDuration returns delay duration in a form of time.Duration. @@ -111,6 +100,33 @@ func (i *Item) Nack() error { return nil } -func (i *Item) Requeue(_ int64) error { +func (i *Item) Requeue(delay int64) error { + go func() { + time.Sleep(time.Second * time.Duration(delay)) + // overwrite the delay + i.Options.Delay = delay + select { + case i.Options.requeueCh <- i: + return + default: + // TODO(rustatian): logs? + return + } + }() + return nil } + +func fromJob(job *job.Job) *Item { + return &Item{ + Job: job.Job, + Ident: job.Ident, + Payload: job.Payload, + Options: &Options{ + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, + }, + } +} diff --git a/plugins/jobs/drivers/ephemeral/requeue.go b/plugins/jobs/drivers/ephemeral/requeue.go new file mode 100644 index 00000000..afb97d54 --- /dev/null +++ b/plugins/jobs/drivers/ephemeral/requeue.go @@ -0,0 +1,25 @@ +package ephemeral + +import "context" + +// requeueListener should handle items passed to requeue +func (j *JobConsumer) requeueListener() { + go func() { + for { //nolint:gosimple + select { + case item, ok := <-j.requeueCh: + if !ok { + j.log.Info("requeue channel closed") + return + } + + // TODO(rustatian): what timeout to use? + err := j.handleItem(context.TODO(), item) + if err != nil { + j.log.Error("requeue handle item", "error", err) + continue + } + } + } + }() +} |