diff options
Diffstat (limited to 'plugins/jobs/drivers/ephemeral/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 69 |
1 files changed, 44 insertions, 25 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 2b0ff40b..71652066 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -2,6 +2,7 @@ package ephemeral import ( "sync" + "sync/atomic" "time" "github.com/spiral/errors" @@ -14,20 +15,22 @@ import ( ) const ( - pipelineSize string = "pipeline_size" + prefetch string = "prefetch" ) type Config struct { - PipelineSize uint64 `mapstructure:"pipeline_size"` + Prefetch uint64 `mapstructure:"prefetch"` } type JobBroker struct { - cfg *Config - log logger.Logger - eh events.Handler - pipeline sync.Map - pq priorityqueue.Queue - localQueue chan *Item + cfg *Config + log logger.Logger + eh events.Handler + pipeline sync.Map + pq priorityqueue.Queue + localPrefetch chan *Item + + goroutinesMaxNum uint64 stopCh chan struct{} } @@ -36,10 +39,11 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh const op = errors.Op("new_ephemeral_pipeline") jb := &JobBroker{ - log: log, - pq: pq, - eh: eh, - stopCh: make(chan struct{}, 1), + log: log, + pq: pq, + eh: eh, + goroutinesMaxNum: 1000, + stopCh: make(chan struct{}, 1), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -47,12 +51,12 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh return nil, errors.E(op, err) } - if jb.cfg.PipelineSize == 0 { - jb.cfg.PipelineSize = 100_000 + if jb.cfg.Prefetch == 0 { + jb.cfg.Prefetch = 100_000 } // initialize a local queue - jb.localQueue = make(chan *Item, jb.cfg.PipelineSize) + jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) // consume from the queue go jb.consume() @@ -62,14 +66,15 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ - log: log, - pq: pq, - eh: eh, - stopCh: make(chan struct{}, 1), + log: log, + pq: pq, + eh: eh, + goroutinesMaxNum: 1000, + stopCh: make(chan struct{}, 1), } // initialize a local queue - jb.localQueue = make(chan *Item, pipeline.Int(pipelineSize, 100_000)) + jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) // consume from the queue go jb.consume() @@ -88,19 +93,33 @@ func (j *JobBroker) Push(jb *job.Job) error { msg := fromJob(jb) // handle timeouts - if msg.Options.Timeout > 0 { + // theoretically, some bad user may send a millions requests with a delay and produce a billion (for example) + // goroutines here. We should limit goroutines here. + if msg.Options.Delay > 0 { + // if we have 1000 goroutines waiting on the delay - reject 1001 + if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 { + return errors.E(op, errors.Str("max concurrency number reached")) + } + go func(jj *job.Job) { - time.Sleep(jj.Options.TimeoutDuration()) + atomic.AddUint64(&j.goroutinesMaxNum, 1) + time.Sleep(jj.Options.DelayDuration()) // send the item after timeout expired - j.localQueue <- msg + j.localPrefetch <- msg + + atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0)) }(jb) return nil } // insert to the local, limited pipeline - j.localQueue <- msg + select { + case j.localPrefetch <- msg: + default: + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d", j.cfg.Prefetch)) + } return nil } @@ -112,7 +131,7 @@ func (j *JobBroker) consume() { // redirect for { select { - case item := <-j.localQueue: + case item := <-j.localPrefetch: j.pq.Insert(item) case <-j.stopCh: return |