summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/ephemeral/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/ephemeral/consumer.go')
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go69
1 files changed, 44 insertions, 25 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 2b0ff40b..71652066 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -2,6 +2,7 @@ package ephemeral
import (
"sync"
+ "sync/atomic"
"time"
"github.com/spiral/errors"
@@ -14,20 +15,22 @@ import (
)
const (
- pipelineSize string = "pipeline_size"
+ prefetch string = "prefetch"
)
type Config struct {
- PipelineSize uint64 `mapstructure:"pipeline_size"`
+ Prefetch uint64 `mapstructure:"prefetch"`
}
type JobBroker struct {
- cfg *Config
- log logger.Logger
- eh events.Handler
- pipeline sync.Map
- pq priorityqueue.Queue
- localQueue chan *Item
+ cfg *Config
+ log logger.Logger
+ eh events.Handler
+ pipeline sync.Map
+ pq priorityqueue.Queue
+ localPrefetch chan *Item
+
+ goroutinesMaxNum uint64
stopCh chan struct{}
}
@@ -36,10 +39,11 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
const op = errors.Op("new_ephemeral_pipeline")
jb := &JobBroker{
- log: log,
- pq: pq,
- eh: eh,
- stopCh: make(chan struct{}, 1),
+ log: log,
+ pq: pq,
+ eh: eh,
+ goroutinesMaxNum: 1000,
+ stopCh: make(chan struct{}, 1),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -47,12 +51,12 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
return nil, errors.E(op, err)
}
- if jb.cfg.PipelineSize == 0 {
- jb.cfg.PipelineSize = 100_000
+ if jb.cfg.Prefetch == 0 {
+ jb.cfg.Prefetch = 100_000
}
// initialize a local queue
- jb.localQueue = make(chan *Item, jb.cfg.PipelineSize)
+ jb.localPrefetch = make(chan *Item, jb.cfg.Prefetch)
// consume from the queue
go jb.consume()
@@ -62,14 +66,15 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Handler, pq priorityqueue.Queue) (*JobBroker, error) {
jb := &JobBroker{
- log: log,
- pq: pq,
- eh: eh,
- stopCh: make(chan struct{}, 1),
+ log: log,
+ pq: pq,
+ eh: eh,
+ goroutinesMaxNum: 1000,
+ stopCh: make(chan struct{}, 1),
}
// initialize a local queue
- jb.localQueue = make(chan *Item, pipeline.Int(pipelineSize, 100_000))
+ jb.localPrefetch = make(chan *Item, pipeline.Int(prefetch, 100_000))
// consume from the queue
go jb.consume()
@@ -88,19 +93,33 @@ func (j *JobBroker) Push(jb *job.Job) error {
msg := fromJob(jb)
// handle timeouts
- if msg.Options.Timeout > 0 {
+ // theoretically, some bad user may send a millions requests with a delay and produce a billion (for example)
+ // goroutines here. We should limit goroutines here.
+ if msg.Options.Delay > 0 {
+ // if we have 1000 goroutines waiting on the delay - reject 1001
+ if atomic.LoadUint64(&j.goroutinesMaxNum) >= 1000 {
+ return errors.E(op, errors.Str("max concurrency number reached"))
+ }
+
go func(jj *job.Job) {
- time.Sleep(jj.Options.TimeoutDuration())
+ atomic.AddUint64(&j.goroutinesMaxNum, 1)
+ time.Sleep(jj.Options.DelayDuration())
// send the item after timeout expired
- j.localQueue <- msg
+ j.localPrefetch <- msg
+
+ atomic.AddUint64(&j.goroutinesMaxNum, ^uint64(0))
}(jb)
return nil
}
// insert to the local, limited pipeline
- j.localQueue <- msg
+ select {
+ case j.localPrefetch <- msg:
+ default:
+ return errors.E(op, errors.Errorf("local pipeline is full, consider to increase prefetch number, current limit: %d", j.cfg.Prefetch))
+ }
return nil
}
@@ -112,7 +131,7 @@ func (j *JobBroker) consume() {
// redirect
for {
select {
- case item := <-j.localQueue:
+ case item := <-j.localPrefetch:
j.pq.Insert(item)
case <-j.stopCh:
return