diff options
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 1 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 69 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_ephemeral_test.go | 6 |
3 files changed, 47 insertions, 29 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index ab5aad14..32ce6ef7 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -180,7 +180,6 @@ func (j *JobConsumer) Push(jb *job.Job) error { // increase the ttr to 1. Maximum ttr is 2**32-1. id, err := j.pool.Put(bb.Bytes(), 0, item.Options.DelayDuration(), item.Options.TimeoutDuration()) if err != nil { - // TODO check for the connection error errD := j.pool.Delete(id) if errD != nil { return errors.E(op, errors.Errorf("%s:%s", err.Error(), errD.Error())) diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 2b0ff40b..71652066 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -2,6 +2,7 @@ package ephemeral import ( "sync" + "sync/atomic" "time" "github.com/spiral/errors" @@ -14,20 +15,22 @@ import ( ) const ( - pipelineSize string = "pipeline_size" + prefetch string = "prefetch" ) type Config struct { - PipelineSize uint64 `mapstructure:"pipeline_size"` + Prefetch uint64 `mapstructure:"prefetch"` } type JobBroker struct { - cfg *Config - log logger.Logger - eh events.Handler - pipeline sync.Map - pq priorityqueue.Queue - localQueue chan *Item + cfg *Config + log logger.Logger + eh events.Handler + pipeline sync.Map + pq priorityqueue.Queue + localPrefetch chan *Item + + goroutinesMaxNum uint64 stopCh chan struct{} } @@ -36,10 +39,11 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh const op = errors.Op("new_ephemeral_pipeline") jb := &JobBroker{ - log: log, - pq: pq, - eh: eh, - stopCh: make(chan struct{}, 1), + log: log, + pq: pq, + eh: eh, + goroutinesMaxNum: 1000, + stopCh: make(chan struct{}, 1), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -47,12 +51,12 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh return nil, errors.E(op, err) } - if jb.cfg.PipelineSize == 0 { - jb.cfg.PipelineSize = 100_000 + if jb.cfg.Prefetch == 0 { + jb.cfg.Prefetch = 100_000 } // initialize a local queue - jb.localQueue = make(chan *Item, jb.cfg.PipelineSize) + jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch) // consume from the queue go jb.consume() @@ -62,14 +66,15 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) { jb := &JobBroker{ - log: log, - pq: pq, - eh: eh, - stopCh: make(chan struct{}, 1), + log: log, + pq: pq, + eh: eh, + goroutinesMaxNum: 1000, + stopCh: make(chan struct{}, 1), } // initialize a local queue - jb.localQueue = make(chan *Item, pipeline.Int(pipelineSize, 100_000)) + jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000)) // consume from the queue go jb.consume() @@ -88,19 +93,33 @@ func (j *JobBroker) Push(jb *job.Job) error { msg := fromJob(jb) // handle timeouts - if msg.Options.Timeout > 0 { + // theoretically, some bad user may send a millions requests with a delay and produce a billion (for example) + // goroutines here. We should limit goroutines here. + if msg.Options.Delay > 0 { + // if we have 1000 goroutines waiting on the delay - reject 1001 + if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 { + return errors.E(op, errors.Str("max concurrency number reached")) + } + go func(jj *job.Job) { - time.Sleep(jj.Options.TimeoutDuration()) + atomic.AddUint64(&j.goroutinesMaxNum, 1) + time.Sleep(jj.Options.DelayDuration()) // send the item after timeout expired - j.localQueue <- msg + j.localPrefetch <- msg + + atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0)) }(jb) return nil } // insert to the local, limited pipeline - j.localQueue <- msg + select { + case j.localPrefetch <- msg: + default: + return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d", j.cfg.Prefetch)) + } return nil } @@ -112,7 +131,7 @@ func (j *JobBroker) consume() { // redirect for { select { - case item := <-j.localQueue: + case item := <-j.localPrefetch: j.pq.Insert(item) case <-j.stopCh: return diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index e8974a59..37e25970 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -222,9 +222,9 @@ func declareEphemeralPipe(t *testing.T) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "ephemeral", - "name": "test-3", - "pipeline_size": "10000", + "driver": "ephemeral", + "name": "test-3", + "prefetch": "10000", }} er := &jobsv1beta.Empty{} |